diff --git a/beacon_node/client/src/builder.rs b/beacon_node/client/src/builder.rs index c160081456..f8375c3d2d 100644 --- a/beacon_node/client/src/builder.rs +++ b/beacon_node/client/src/builder.rs @@ -63,6 +63,7 @@ pub struct ClientBuilder { libp2p_network: Option>>, libp2p_network_send: Option>, http_listen_addr: Option, + grpc_listen_addr: Option<(String, u16)>, websocket_listen_addr: Option, eth_spec_instance: T::EthSpec, } @@ -94,6 +95,7 @@ where libp2p_network: None, libp2p_network_send: None, http_listen_addr: None, + grpc_listen_addr: None, websocket_listen_addr: None, eth_spec_instance, } @@ -283,15 +285,17 @@ where .clone() .ok_or_else(|| "grpc_server requires a libp2p network")?; - let exit_signal = rpc::start_server( + let (exit_signal, listen_addr) = rpc::start_server( config, &context.executor, network_send, beacon_chain, context.log, - ); + ) + .map_err(|e| format!("Failed to start gRPC server: {}", e))?; self.exit_signals.push(exit_signal); + self.grpc_listen_addr = Some(listen_addr); Ok(self) } @@ -455,6 +459,7 @@ where beacon_chain: self.beacon_chain, libp2p_network: self.libp2p_network, http_listen_addr: self.http_listen_addr, + grpc_listen_addr: self.grpc_listen_addr, websocket_listen_addr: self.websocket_listen_addr, _exit_signals: self.exit_signals, } diff --git a/beacon_node/client/src/lib.rs b/beacon_node/client/src/lib.rs index f0ed63e485..4da474ea1a 100644 --- a/beacon_node/client/src/lib.rs +++ b/beacon_node/client/src/lib.rs @@ -25,6 +25,7 @@ pub struct Client { libp2p_network: Option>>, http_listen_addr: Option, websocket_listen_addr: Option, + grpc_listen_addr: Option<(String, u16)>, /// Exit signals will "fire" when dropped, causing each service to exit gracefully. _exit_signals: Vec, } @@ -40,6 +41,11 @@ impl Client { self.http_listen_addr } + /// Returns the address of the client's gRPC API server, if it was started. + pub fn grpc_listen_addr(&self) -> Option<(String, u16)> { + self.grpc_listen_addr.clone() + } + /// Returns the address of the client's WebSocket API server, if it was started. pub fn websocket_listen_addr(&self) -> Option { self.websocket_listen_addr diff --git a/beacon_node/rpc/src/lib.rs b/beacon_node/rpc/src/lib.rs index 3425eeeac2..b08701df67 100644 --- a/beacon_node/rpc/src/lib.rs +++ b/beacon_node/rpc/src/lib.rs @@ -28,7 +28,7 @@ pub fn start_server( network_chan: mpsc::UnboundedSender, beacon_chain: Arc>, log: slog::Logger, -) -> exit_future::Signal { +) -> Result<(exit_future::Signal, (String, u16)), String> { let env = Arc::new(Environment::new(1)); // build a channel to kill the rpc server @@ -76,16 +76,24 @@ pub fn start_server( .build() .unwrap(); + server.start(); + + for &(ref host, port) in server.bind_addrs() { + info!( + log, + "gRPC API started"; + "port" => port, + "host" => host, + ); + } + + let listen_addr = server + .bind_addrs() + .first() + .ok_or_else(|| "gRPC server is not listening on any ports".to_string())? + .clone(); + let spawn_rpc = { - server.start(); - for &(ref host, port) in server.bind_addrs() { - info!( - log, - "gRPC API started"; - "port" => port, - "host" => host, - ); - } rpc_exit.and_then(move |_| { info!(log, "RPC Server shutting down"); server @@ -97,5 +105,6 @@ pub fn start_server( }) }; executor.spawn(spawn_rpc); - rpc_exit_signal + + Ok((rpc_exit_signal, listen_addr)) } diff --git a/tests/beacon_chain_sim/Cargo.toml b/tests/beacon_chain_sim/Cargo.toml index 8ce05e2d66..37734a0305 100644 --- a/tests/beacon_chain_sim/Cargo.toml +++ b/tests/beacon_chain_sim/Cargo.toml @@ -9,3 +9,4 @@ edition = "2018" [dependencies] node_test_rig = { path = "../node_test_rig" } types = { path = "../../eth2/types" } +validator_client = { path = "../../validator_client" } diff --git a/tests/beacon_chain_sim/src/main.rs b/tests/beacon_chain_sim/src/main.rs index 0915bd372f..fba4995f78 100644 --- a/tests/beacon_chain_sim/src/main.rs +++ b/tests/beacon_chain_sim/src/main.rs @@ -1,19 +1,23 @@ use node_test_rig::{ environment::{EnvironmentBuilder, RuntimeContext}, - testing_client_config, ClientConfig, LocalBeaconNode, ProductionClient, + testing_client_config, ClientConfig, LocalBeaconNode, LocalValidatorClient, ProductionClient, + ValidatorConfig, }; use types::EthSpec; pub type BeaconNode = LocalBeaconNode>; fn main() { - match simulation(4) { + let nodes = 4; + let validators_per_node = 64 / nodes; + + match simulation(nodes, validators_per_node) { Ok(()) => println!("Simulation exited successfully"), Err(e) => println!("Simulation exited with error: {}", e), } } -fn simulation(num_nodes: usize) -> Result<(), String> { +fn simulation(num_nodes: usize, validators_per_node: usize) -> Result<(), String> { if num_nodes < 1 { return Err("Must have at least one node".into()); } @@ -28,19 +32,48 @@ fn simulation(num_nodes: usize) -> Result<(), String> { let boot_node = BeaconNode::production(env.service_context("boot_node".into()), base_config.clone()); - let nodes = (1..num_nodes) + let mut nodes = (1..num_nodes) .map(|i| { let context = env.service_context(format!("node_{}", i)); new_with_bootnode_via_enr(context, &boot_node, base_config.clone()) }) .collect::>(); + let validators = nodes + .iter() + .enumerate() + .map(|(i, node)| { + let mut context = env.service_context(format!("validator_{}", i)); + + // Pull the spec from the beacon node's beacon chain, in case there were some changes + // to the spec after the node booted. + context.eth2_config.spec = node + .client + .beacon_chain() + .expect("should have beacon chain") + .spec + .clone(); + + let indices = + (i * validators_per_node..(i + 1) * validators_per_node).collect::>(); + new_validator_client( + env.service_context(format!("validator_{}", i)), + node, + ValidatorConfig::default(), + &indices, + ) + }) + .collect::>(); + + nodes.insert(0, boot_node); + env.block_until_ctrl_c()?; Ok(()) } -// TODO: this function does not result in nodes connecting to each other. Age to investigate? +// TODO: this function does not result in nodes connecting to each other. This is a bug due to +// using a 0 port for discovery. Age is fixing it. fn new_with_bootnode_via_enr( context: RuntimeContext, boot_node: &BeaconNode, @@ -75,3 +108,22 @@ fn new_with_bootnode_via_multiaddr( BeaconNode::production(context, config) } + +fn new_validator_client( + context: RuntimeContext, + beacon_node: &BeaconNode, + base_config: ValidatorConfig, + keypair_indices: &[usize], +) -> LocalValidatorClient { + let mut config = base_config; + + let (grpc_endpoint, grpc_port) = beacon_node + .client + .grpc_listen_addr() + .expect("Must have gRPC started"); + + config.server = grpc_endpoint; + config.server_grpc_port = grpc_port; + + LocalValidatorClient::production_with_insecure_keypairs(context, config, keypair_indices) +} diff --git a/tests/node_test_rig/Cargo.toml b/tests/node_test_rig/Cargo.toml index 7bb19db9c6..c8e1ebcc68 100644 --- a/tests/node_test_rig/Cargo.toml +++ b/tests/node_test_rig/Cargo.toml @@ -16,3 +16,4 @@ serde = "1.0" futures = "0.1.25" genesis = { path = "../../beacon_node/genesis" } remote_beacon_node = { path = "../../eth2/utils/remote_beacon_node" } +validator_client = { path = "../../validator_client" } diff --git a/tests/node_test_rig/src/lib.rs b/tests/node_test_rig/src/lib.rs index 060215df29..1499cff31d 100644 --- a/tests/node_test_rig/src/lib.rs +++ b/tests/node_test_rig/src/lib.rs @@ -3,13 +3,17 @@ use environment::RuntimeContext; use futures::Future; use remote_beacon_node::RemoteBeaconNode; use std::path::PathBuf; +use std::time::{SystemTime, UNIX_EPOCH}; use tempdir::TempDir; use types::EthSpec; +use validator_client::{validator_directory::ValidatorDirectoryBuilder, ProductionValidatorClient}; pub use beacon_node::{ClientConfig, ProductionClient}; pub use environment; +pub use validator_client::Config as ValidatorConfig; -/// Provides a beacon node that is running in the current process. Useful for testing purposes. +/// Provides a beacon node that is running in the current process (i.e., local). Useful for testing +/// purposes. pub struct LocalBeaconNode { pub client: T, pub datadir: TempDir, @@ -56,10 +60,67 @@ pub fn testing_client_config() -> ClientConfig { client_config.rest_api.port = 0; client_config.websocket_server.port = 0; + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("should get system time") + .as_secs(); + client_config.genesis = ClientGenesis::Interop { validator_count: 8, - genesis_time: 13_371_337, + genesis_time: now, }; client_config } + +pub struct LocalValidatorClient { + pub client: ProductionValidatorClient, + pub datadir: TempDir, +} + +impl LocalValidatorClient { + pub fn production_with_insecure_keypairs( + context: RuntimeContext, + config: ValidatorConfig, + keypair_indices: &[usize], + ) -> Self { + // Creates a temporary directory that will be deleted once this `TempDir` is dropped. + let datadir = TempDir::new("lighthouse-beacon-node") + .expect("should create temp directory for client datadir"); + + keypair_indices.iter().for_each(|i| { + ValidatorDirectoryBuilder::default() + .spec(context.eth2_config.spec.clone()) + .full_deposit_amount() + .expect("should set full deposit amount") + .insecure_keypairs(*i) + .create_directory(PathBuf::from(datadir.path())) + .expect("should create directory") + .write_keypair_files() + .expect("should write keypair files") + .write_eth1_data_file() + .expect("should write eth1 data file") + .build() + .expect("should build dir"); + }); + + Self::new(context, config, datadir) + } + + pub fn production(context: RuntimeContext, config: ValidatorConfig) -> Self { + // Creates a temporary directory that will be deleted once this `TempDir` is dropped. + let datadir = TempDir::new("lighthouse-validator") + .expect("should create temp directory for client datadir"); + + Self::new(context, config, datadir) + } + + fn new(context: RuntimeContext, mut config: ValidatorConfig, datadir: TempDir) -> Self { + config.data_dir = datadir.path().into(); + + let client = + ProductionValidatorClient::new(context, config).expect("should start validator client"); + + Self { client, datadir } + } +} diff --git a/validator_client/src/lib.rs b/validator_client/src/lib.rs index 930f35c32a..d9b6f40443 100644 --- a/validator_client/src/lib.rs +++ b/validator_client/src/lib.rs @@ -43,20 +43,36 @@ pub struct ProductionValidatorClient { impl ProductionValidatorClient { /// Instantiates the validator client, _without_ starting the timers to trigger block /// and attestation production. - pub fn new_from_cli(context: RuntimeContext, matches: &ArgMatches) -> Result { + pub fn new_from_cli( + mut context: RuntimeContext, + matches: &ArgMatches, + ) -> Result { let mut log = context.log.clone(); - let (client_config, eth2_config) = get_configs(&matches, &mut log) + let (config, eth2_config) = get_configs(&matches, &mut log) .map_err(|e| format!("Unable to initialize config: {}", e))?; + // TODO: the eth2 config in the env is being completely ignored. + // + // See https://github.com/sigp/lighthouse/issues/602 + context.eth2_config = eth2_config; + + Self::new(context, config) + } + + /// Instantiates the validator client, _without_ starting the timers to trigger block + /// and attestation production. + pub fn new(context: RuntimeContext, config: Config) -> Result { + let log = context.log.clone(); + info!( log, "Starting validator client"; - "datadir" => client_config.full_data_dir().expect("Unable to find datadir").to_str(), + "datadir" => config.full_data_dir().expect("Unable to find datadir").to_str(), ); let service: Service = - Service::initialize_service(client_config, eth2_config, log.clone()) + Service::initialize_service(config, context.eth2_config.clone(), log.clone()) .map_err(|e| e.to_string())?; Ok(Self { diff --git a/validator_client/src/service.rs b/validator_client/src/service.rs index 4d304ddd2c..c733ef31ed 100644 --- a/validator_client/src/service.rs +++ b/validator_client/src/service.rs @@ -161,6 +161,12 @@ impl Service { // Load generated keypairs let keypairs = Arc::new(client_config.fetch_keys(&log)?); + info!( + log, + "Keypairs loaded"; + "local_validator_count" => keypairs.len() + ); + let slots_per_epoch = E::slots_per_epoch(); // TODO: keypairs are randomly generated; they should be loaded from a file or generated.