From f5193f299b34f55e04cb516a1f760f49404e08a4 Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Sat, 30 Nov 2019 21:02:17 +1100 Subject: [PATCH] Refactor beacon_chain_sim --- beacon_node/client/src/builder.rs | 16 +- lighthouse/environment/src/lib.rs | 2 +- tests/beacon_chain_sim/Cargo.toml | 3 + tests/beacon_chain_sim/src/main.rs | 189 +++++++++--------- .../beacon_chain_sim/src/simulated_network.rs | 125 ++++++++++++ tests/node_test_rig/src/lib.rs | 2 +- validator_client/src/lib.rs | 8 +- 7 files changed, 236 insertions(+), 109 deletions(-) create mode 100644 tests/beacon_chain_sim/src/simulated_network.rs diff --git a/beacon_node/client/src/builder.rs b/beacon_node/client/src/builder.rs index 325043e653..6e3eb607b8 100644 --- a/beacon_node/client/src/builder.rs +++ b/beacon_node/client/src/builder.rs @@ -149,7 +149,7 @@ where })?; let context = runtime_context .ok_or_else(|| "beacon_chain_start_method requires a log".to_string())? - .service_context("beacon"); + .service_context("beacon".into()); let spec = chain_spec .ok_or_else(|| "beacon_chain_start_method requires a chain spec".to_string())?; @@ -270,7 +270,7 @@ where .runtime_context .as_ref() .ok_or_else(|| "libp2p_network requires a runtime_context")? - .service_context("network"); + .service_context("network".into()); let (network, network_send) = NetworkService::new(beacon_chain, config, &context.executor, context.log) @@ -296,7 +296,7 @@ where .runtime_context .as_ref() .ok_or_else(|| "http_server requires a runtime_context")? - .service_context("http"); + .service_context("http".into()); let network = self .libp2p_network .clone() @@ -336,7 +336,7 @@ where .runtime_context .as_ref() .ok_or_else(|| "peer_count_notifier requires a runtime_context")? - .service_context("peer_notifier"); + .service_context("peer_notifier".into()); let log = context.log.clone(); let log_2 = context.log.clone(); let network = self @@ -379,7 +379,7 @@ where .runtime_context .as_ref() .ok_or_else(|| "slot_notifier requires a runtime_context")? - .service_context("slot_notifier"); + .service_context("slot_notifier".into()); let log = context.log.clone(); let log_2 = log.clone(); let beacon_chain = self @@ -532,7 +532,7 @@ where .runtime_context .as_ref() .ok_or_else(|| "websocket_event_handler requires a runtime_context")? - .service_context("ws"); + .service_context("ws".into()); let (sender, exit_signal, listening_addr): ( WebSocketSender, @@ -582,7 +582,7 @@ where .runtime_context .as_ref() .ok_or_else(|| "disk_store requires a log".to_string())? - .service_context("freezer_db"); + .service_context("freezer_db".into()); let spec = self .chain_spec .clone() @@ -710,7 +710,7 @@ where .runtime_context .as_ref() .ok_or_else(|| "caching_eth1_backend requires a runtime_context")? - .service_context("eth1_rpc"); + .service_context("eth1_rpc".into()); let beacon_chain_builder = self .beacon_chain_builder .ok_or_else(|| "caching_eth1_backend requires a beacon_chain_builder")?; diff --git a/lighthouse/environment/src/lib.rs b/lighthouse/environment/src/lib.rs index cb3ef42c94..8aa130f7bf 100644 --- a/lighthouse/environment/src/lib.rs +++ b/lighthouse/environment/src/lib.rs @@ -149,7 +149,7 @@ impl RuntimeContext { /// Returns a sub-context of this context. /// /// The generated service will have the `service_name` in all it's logs. - pub fn service_context(&self, service_name: &'static str) -> Self { + pub fn service_context(&self, service_name: String) -> Self { Self { executor: self.executor.clone(), log: self.log.new(o!("service" => service_name)), diff --git a/tests/beacon_chain_sim/Cargo.toml b/tests/beacon_chain_sim/Cargo.toml index 37734a0305..e731ffe1ee 100644 --- a/tests/beacon_chain_sim/Cargo.toml +++ b/tests/beacon_chain_sim/Cargo.toml @@ -10,3 +10,6 @@ edition = "2018" node_test_rig = { path = "../node_test_rig" } types = { path = "../../eth2/types" } validator_client = { path = "../../validator_client" } +parking_lot = "0.9.0" +futures = "0.1.29" +tokio = "0.1.22" diff --git a/tests/beacon_chain_sim/src/main.rs b/tests/beacon_chain_sim/src/main.rs index 8f422ebb77..ad475be8c2 100644 --- a/tests/beacon_chain_sim/src/main.rs +++ b/tests/beacon_chain_sim/src/main.rs @@ -1,13 +1,16 @@ -// mod simulated_network; +mod simulated_network; +use futures::{future, stream, Future, IntoFuture, Stream}; use node_test_rig::{ - environment::{Environment, EnvironmentBuilder, RuntimeContext}, - testing_client_config, ClientConfig, ClientGenesis, LocalBeaconNode, LocalValidatorClient, - ProductionClient, ValidatorConfig, + environment::EnvironmentBuilder, testing_client_config, ClientGenesis, LocalBeaconNode, + LocalValidatorClient, ProductionClient, ValidatorConfig, }; -use std::time::{SystemTime, UNIX_EPOCH}; -use types::EthSpec; +use simulated_network::LocalNetwork; +use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; +use tokio::timer::Delay; +use types::{Epoch, EthSpec, MinimalEthSpec}; +pub type E = MinimalEthSpec; pub type BeaconNode = LocalBeaconNode>; pub type ValidatorClient = LocalValidatorClient; @@ -15,122 +18,118 @@ fn main() { let nodes = 4; let validators_per_node = 64 / nodes; - match simulation(nodes, validators_per_node) { + match async_sim(nodes, validators_per_node, 4) { Ok(()) => println!("Simulation exited successfully"), Err(e) => println!("Simulation exited with error: {}", e), } } -fn simulation(num_nodes: usize, validators_per_node: usize) -> Result<(), String> { - if num_nodes < 1 { - return Err("Must have at least one node".into()); - } - +fn async_sim( + node_count: usize, + validators_per_node: usize, + speed_up_factor: u64, +) -> Result<(), String> { let mut env = EnvironmentBuilder::minimal() .async_logger("debug")? .multi_threaded_tokio_runtime()? .build()?; - env.eth2_config.spec.milliseconds_per_slot = 2_000; - - let mut base_config = testing_client_config(); + env.eth2_config.spec.milliseconds_per_slot = + env.eth2_config.spec.milliseconds_per_slot / speed_up_factor; let now = SystemTime::now() .duration_since(UNIX_EPOCH) .expect("should get system time") .as_secs(); - base_config.genesis = ClientGenesis::Interop { + + let mut beacon_config = testing_client_config(); + beacon_config.genesis = ClientGenesis::Interop { genesis_time: now, - validator_count: num_nodes * validators_per_node, + validator_count: node_count * validators_per_node, }; - let boot_node = - BeaconNode::production(env.service_context("boot_node".into()), base_config.clone()); + let slot_duration = Duration::from_millis(env.eth2_config.spec.milliseconds_per_slot); - let mut nodes = (1..num_nodes) - .map(|i| { - let context = env.service_context(format!("node_{}", i)); - new_with_bootnode_via_enr(context, &boot_node, base_config.clone()) + let network = LocalNetwork::new(env.core_context(), beacon_config.clone())?; + + let network_1 = network.clone(); + let network_2 = network.clone(); + let network_3 = network.clone(); + + let future = future::ok(()) + .and_then(move |()| { + let network = network_1; + + for _ in 0..node_count - 1 { + network.add_beacon_node(beacon_config.clone())?; + } + + Ok(()) }) - .collect::>(); + .and_then(move |()| { + let network = network_2; - let _validators = nodes - .iter() - .enumerate() - .map(|(i, node)| { - let mut context = env.service_context(format!("validator_{}", i)); + stream::unfold(0..node_count, move |mut iter| { + iter.next().map(|i| { + let indices = (i * validators_per_node..(i + 1) * validators_per_node) + .collect::>(); - // Pull the spec from the beacon node's beacon chain, in case there were some changes - // to the spec after the node booted. - context.eth2_config.spec = node - .client - .beacon_chain() - .expect("should have beacon chain") - .spec - .clone(); - - let context = env.service_context(format!("validator_{}", i)); - - let indices = - (i * validators_per_node..(i + 1) * validators_per_node).collect::>(); - new_validator_client( - &mut env, - context, - node, - ValidatorConfig::default(), - &indices, - ) + network + .add_validator_client(ValidatorConfig::default(), i, indices) + .map(|()| ((), iter)) + }) + }) + .collect() + .map(|_| ()) }) - .collect::>(); + .and_then(move |_| { + epoch_delay(Epoch::new(4), slot_duration, E::slots_per_epoch()) + .and_then(|()| verify_all_finalized_at(network_3, Epoch::new(2))) + }); - nodes.insert(0, boot_node); - - env.block_until_ctrl_c()?; - - Ok(()) + env.runtime().block_on(future) } -// TODO: this function does not result in nodes connecting to each other. This is a bug due to -// using a 0 port for discovery. Age is fixing it. -fn new_with_bootnode_via_enr( - context: RuntimeContext, - boot_node: &BeaconNode, - base_config: ClientConfig, -) -> BeaconNode { - let mut config = base_config; - config.network.boot_nodes.push( - boot_node - .client - .enr() - .expect("bootnode must have a network"), - ); +/// Delays for `epochs`, plus half a slot extra. +fn epoch_delay( + epochs: Epoch, + slot_duration: Duration, + slots_per_epoch: u64, +) -> impl Future { + let duration = slot_duration * (epochs.as_u64() * slots_per_epoch) as u32 + slot_duration / 2; - BeaconNode::production(context, config) + Delay::new(Instant::now() + duration).map_err(|e| format!("Epoch delay failed: {:?}", e)) } -// Note: this function will block until the validator can connect to the beaco node. It is -// recommended to ensure that the beacon node is running first. -fn new_validator_client( - env: &mut Environment, - context: RuntimeContext, - beacon_node: &BeaconNode, - base_config: ValidatorConfig, - keypair_indices: &[usize], -) -> LocalValidatorClient { - let mut config = base_config; - - let socket_addr = beacon_node - .client - .http_listen_addr() - .expect("Must have http started"); - - config.http_server = format!("http://{}:{}", socket_addr.ip(), socket_addr.port()); - - env.runtime() - .block_on(LocalValidatorClient::production_with_insecure_keypairs( - context, - config, - keypair_indices, - )) - .expect("should start validator") +fn verify_all_finalized_at( + network: LocalNetwork, + epoch: Epoch, +) -> impl Future { + network + .remote_nodes() + .into_future() + .and_then(|remote_nodes| { + stream::unfold(remote_nodes.into_iter(), |mut iter| { + iter.next().map(|remote_node| { + remote_node + .http + .beacon() + .get_head() + .map(|head| head.finalized_slot.epoch(E::slots_per_epoch())) + .map(|epoch| (epoch, iter)) + .map_err(|e| format!("Get head via http failed: {:?}", e)) + }) + }) + .collect() + }) + .and_then(move |epochs| { + if epochs.iter().any(|node_epoch| *node_epoch != epoch) { + Err(format!( + "Nodes are not finalized at epoch {}. Finalized epochs: {:?}", + epoch, epochs + )) + } else { + Ok(()) + } + }) } diff --git a/tests/beacon_chain_sim/src/simulated_network.rs b/tests/beacon_chain_sim/src/simulated_network.rs new file mode 100644 index 0000000000..4db2239053 --- /dev/null +++ b/tests/beacon_chain_sim/src/simulated_network.rs @@ -0,0 +1,125 @@ +use crate::{BeaconNode, ValidatorClient}; +use futures::{Future, IntoFuture}; +use node_test_rig::{ + environment::RuntimeContext, ClientConfig, LocalValidatorClient, RemoteBeaconNode, + ValidatorConfig, +}; +use parking_lot::RwLock; +use std::ops::Deref; +use std::sync::Arc; +use types::EthSpec; + +pub struct Inner { + context: RuntimeContext, + beacon_nodes: RwLock>>, + validator_clients: RwLock>>, +} + +pub struct LocalNetwork { + inner: Arc>, +} + +impl Clone for LocalNetwork { + fn clone(&self) -> Self { + Self { + inner: self.inner.clone(), + } + } +} + +impl Deref for LocalNetwork { + type Target = Inner; + + fn deref(&self) -> &Self::Target { + self.inner.deref() + } +} + +impl LocalNetwork { + /// Creates a new network with a single `BeaconNode`. + pub fn new(context: RuntimeContext, beacon_config: ClientConfig) -> Result { + let beacon_nodes = vec![BeaconNode::production( + context.service_context("boot_node".into()), + beacon_config, + )]; + + Ok(Self { + inner: Arc::new(Inner { + context, + beacon_nodes: RwLock::new(beacon_nodes), + validator_clients: RwLock::new(vec![]), + }), + }) + } + + pub fn add_beacon_node(&self, mut beacon_config: ClientConfig) -> Result<(), String> { + self.beacon_nodes + .read() + .first() + .map(|boot_node| { + beacon_config.network.boot_nodes.push( + boot_node + .client + .enr() + .expect("bootnode must have a network"), + ); + }) + .ok_or_else(|| "No boot node".to_string())?; + + let index = self.beacon_nodes.read().len(); + + let beacon_node = BeaconNode::production( + self.context.service_context(format!("node_{}", index)), + beacon_config, + ); + + self.beacon_nodes.write().push(beacon_node); + + Ok(()) + } + + pub fn add_validator_client( + &self, + mut validator_config: ValidatorConfig, + beacon_node: usize, + keypair_indices: Vec, + ) -> impl Future { + let index = self.validator_clients.read().len(); + let context = self.context.service_context(format!("validator_{}", index)); + let self_1 = self.clone(); + + self.beacon_nodes + .read() + .get(beacon_node) + .map(move |beacon_node| { + let socket_addr = beacon_node + .client + .http_listen_addr() + .expect("Must have http started"); + + validator_config.http_server = + format!("http://{}:{}", socket_addr.ip(), socket_addr.port()); + + validator_config + }) + .ok_or_else(|| format!("No beacon node for index {}", beacon_node)) + .into_future() + .and_then(move |validator_config| { + LocalValidatorClient::production_with_insecure_keypairs( + context, + validator_config, + &keypair_indices, + ) + }) + .map(move |validator_client| self_1.validator_clients.write().push(validator_client)) + } + + pub fn remote_nodes(&self) -> Result>, String> { + let beacon_nodes = self.beacon_nodes.read(); + + beacon_nodes + .iter() + .map(|beacon_node| beacon_node.remote_node()) + .collect() + } +} diff --git a/tests/node_test_rig/src/lib.rs b/tests/node_test_rig/src/lib.rs index d194801eed..603b0eff89 100644 --- a/tests/node_test_rig/src/lib.rs +++ b/tests/node_test_rig/src/lib.rs @@ -5,7 +5,6 @@ use beacon_node::{beacon_chain::BeaconChainTypes, Client, ProductionBeaconNode}; use environment::RuntimeContext; use futures::Future; -use remote_beacon_node::RemoteBeaconNode; use std::path::PathBuf; use std::time::{SystemTime, UNIX_EPOCH}; use tempdir::TempDir; @@ -14,6 +13,7 @@ use validator_client::{KeySource, ProductionValidatorClient}; pub use beacon_node::{ClientConfig, ClientGenesis, ProductionClient}; pub use environment; +pub use remote_beacon_node::RemoteBeaconNode; pub use validator_client::Config as ValidatorConfig; /// Provids a beacon node that is running in the current process on a given tokio executor (it diff --git a/validator_client/src/lib.rs b/validator_client/src/lib.rs index c37aa2e15e..b727dcb16b 100644 --- a/validator_client/src/lib.rs +++ b/validator_client/src/lib.rs @@ -170,7 +170,7 @@ impl ProductionValidatorClient { let fork_service = ForkServiceBuilder::new() .slot_clock(slot_clock.clone()) .beacon_node(beacon_node.clone()) - .runtime_context(context.service_context("fork")) + .runtime_context(context.service_context("fork".into())) .build()?; let validator_store: ValidatorStore = @@ -207,7 +207,7 @@ impl ProductionValidatorClient { .slot_clock(slot_clock.clone()) .validator_store(validator_store.clone()) .beacon_node(beacon_node.clone()) - .runtime_context(context.service_context("duties")) + .runtime_context(context.service_context("duties".into())) .build()?; let block_service = BlockServiceBuilder::new() @@ -215,7 +215,7 @@ impl ProductionValidatorClient { .slot_clock(slot_clock.clone()) .validator_store(validator_store.clone()) .beacon_node(beacon_node.clone()) - .runtime_context(context.service_context("block")) + .runtime_context(context.service_context("block".into())) .build()?; let attestation_service = AttestationServiceBuilder::new() @@ -223,7 +223,7 @@ impl ProductionValidatorClient { .slot_clock(slot_clock) .validator_store(validator_store) .beacon_node(beacon_node) - .runtime_context(context.service_context("attestation")) + .runtime_context(context.service_context("attestation".into())) .build()?; Ok(Self {