Expand beacon_chain_sim

This commit is contained in:
Paul Hauner
2019-11-20 14:40:28 +11:00
parent 90d63a46c7
commit 40a0bd0544
9 changed files with 181 additions and 24 deletions

View File

@@ -63,6 +63,7 @@ pub struct ClientBuilder<T: BeaconChainTypes> {
libp2p_network: Option<Arc<NetworkService<T>>>, libp2p_network: Option<Arc<NetworkService<T>>>,
libp2p_network_send: Option<UnboundedSender<NetworkMessage>>, libp2p_network_send: Option<UnboundedSender<NetworkMessage>>,
http_listen_addr: Option<SocketAddr>, http_listen_addr: Option<SocketAddr>,
grpc_listen_addr: Option<(String, u16)>,
websocket_listen_addr: Option<SocketAddr>, websocket_listen_addr: Option<SocketAddr>,
eth_spec_instance: T::EthSpec, eth_spec_instance: T::EthSpec,
} }
@@ -94,6 +95,7 @@ where
libp2p_network: None, libp2p_network: None,
libp2p_network_send: None, libp2p_network_send: None,
http_listen_addr: None, http_listen_addr: None,
grpc_listen_addr: None,
websocket_listen_addr: None, websocket_listen_addr: None,
eth_spec_instance, eth_spec_instance,
} }
@@ -283,15 +285,17 @@ where
.clone() .clone()
.ok_or_else(|| "grpc_server requires a libp2p network")?; .ok_or_else(|| "grpc_server requires a libp2p network")?;
let exit_signal = rpc::start_server( let (exit_signal, listen_addr) = rpc::start_server(
config, config,
&context.executor, &context.executor,
network_send, network_send,
beacon_chain, beacon_chain,
context.log, context.log,
); )
.map_err(|e| format!("Failed to start gRPC server: {}", e))?;
self.exit_signals.push(exit_signal); self.exit_signals.push(exit_signal);
self.grpc_listen_addr = Some(listen_addr);
Ok(self) Ok(self)
} }
@@ -455,6 +459,7 @@ where
beacon_chain: self.beacon_chain, beacon_chain: self.beacon_chain,
libp2p_network: self.libp2p_network, libp2p_network: self.libp2p_network,
http_listen_addr: self.http_listen_addr, http_listen_addr: self.http_listen_addr,
grpc_listen_addr: self.grpc_listen_addr,
websocket_listen_addr: self.websocket_listen_addr, websocket_listen_addr: self.websocket_listen_addr,
_exit_signals: self.exit_signals, _exit_signals: self.exit_signals,
} }

View File

@@ -25,6 +25,7 @@ pub struct Client<T: BeaconChainTypes> {
libp2p_network: Option<Arc<NetworkService<T>>>, libp2p_network: Option<Arc<NetworkService<T>>>,
http_listen_addr: Option<SocketAddr>, http_listen_addr: Option<SocketAddr>,
websocket_listen_addr: Option<SocketAddr>, websocket_listen_addr: Option<SocketAddr>,
grpc_listen_addr: Option<(String, u16)>,
/// Exit signals will "fire" when dropped, causing each service to exit gracefully. /// Exit signals will "fire" when dropped, causing each service to exit gracefully.
_exit_signals: Vec<Signal>, _exit_signals: Vec<Signal>,
} }
@@ -40,6 +41,11 @@ impl<T: BeaconChainTypes> Client<T> {
self.http_listen_addr self.http_listen_addr
} }
/// Returns the address of the client's gRPC API server, if it was started.
pub fn grpc_listen_addr(&self) -> Option<(String, u16)> {
self.grpc_listen_addr.clone()
}
/// Returns the address of the client's WebSocket API server, if it was started. /// Returns the address of the client's WebSocket API server, if it was started.
pub fn websocket_listen_addr(&self) -> Option<SocketAddr> { pub fn websocket_listen_addr(&self) -> Option<SocketAddr> {
self.websocket_listen_addr self.websocket_listen_addr

View File

@@ -28,7 +28,7 @@ pub fn start_server<T: BeaconChainTypes>(
network_chan: mpsc::UnboundedSender<NetworkMessage>, network_chan: mpsc::UnboundedSender<NetworkMessage>,
beacon_chain: Arc<BeaconChain<T>>, beacon_chain: Arc<BeaconChain<T>>,
log: slog::Logger, log: slog::Logger,
) -> exit_future::Signal { ) -> Result<(exit_future::Signal, (String, u16)), String> {
let env = Arc::new(Environment::new(1)); let env = Arc::new(Environment::new(1));
// build a channel to kill the rpc server // build a channel to kill the rpc server
@@ -76,16 +76,24 @@ pub fn start_server<T: BeaconChainTypes>(
.build() .build()
.unwrap(); .unwrap();
server.start();
for &(ref host, port) in server.bind_addrs() {
info!(
log,
"gRPC API started";
"port" => port,
"host" => host,
);
}
let listen_addr = server
.bind_addrs()
.first()
.ok_or_else(|| "gRPC server is not listening on any ports".to_string())?
.clone();
let spawn_rpc = { let spawn_rpc = {
server.start();
for &(ref host, port) in server.bind_addrs() {
info!(
log,
"gRPC API started";
"port" => port,
"host" => host,
);
}
rpc_exit.and_then(move |_| { rpc_exit.and_then(move |_| {
info!(log, "RPC Server shutting down"); info!(log, "RPC Server shutting down");
server server
@@ -97,5 +105,6 @@ pub fn start_server<T: BeaconChainTypes>(
}) })
}; };
executor.spawn(spawn_rpc); executor.spawn(spawn_rpc);
rpc_exit_signal
Ok((rpc_exit_signal, listen_addr))
} }

View File

@@ -9,3 +9,4 @@ edition = "2018"
[dependencies] [dependencies]
node_test_rig = { path = "../node_test_rig" } node_test_rig = { path = "../node_test_rig" }
types = { path = "../../eth2/types" } types = { path = "../../eth2/types" }
validator_client = { path = "../../validator_client" }

View File

@@ -1,19 +1,23 @@
use node_test_rig::{ use node_test_rig::{
environment::{EnvironmentBuilder, RuntimeContext}, environment::{EnvironmentBuilder, RuntimeContext},
testing_client_config, ClientConfig, LocalBeaconNode, ProductionClient, testing_client_config, ClientConfig, LocalBeaconNode, LocalValidatorClient, ProductionClient,
ValidatorConfig,
}; };
use types::EthSpec; use types::EthSpec;
pub type BeaconNode<E> = LocalBeaconNode<ProductionClient<E>>; pub type BeaconNode<E> = LocalBeaconNode<ProductionClient<E>>;
fn main() { fn main() {
match simulation(4) { let nodes = 4;
let validators_per_node = 64 / nodes;
match simulation(nodes, validators_per_node) {
Ok(()) => println!("Simulation exited successfully"), Ok(()) => println!("Simulation exited successfully"),
Err(e) => println!("Simulation exited with error: {}", e), Err(e) => println!("Simulation exited with error: {}", e),
} }
} }
fn simulation(num_nodes: usize) -> Result<(), String> { fn simulation(num_nodes: usize, validators_per_node: usize) -> Result<(), String> {
if num_nodes < 1 { if num_nodes < 1 {
return Err("Must have at least one node".into()); return Err("Must have at least one node".into());
} }
@@ -28,19 +32,48 @@ fn simulation(num_nodes: usize) -> Result<(), String> {
let boot_node = let boot_node =
BeaconNode::production(env.service_context("boot_node".into()), base_config.clone()); BeaconNode::production(env.service_context("boot_node".into()), base_config.clone());
let nodes = (1..num_nodes) let mut nodes = (1..num_nodes)
.map(|i| { .map(|i| {
let context = env.service_context(format!("node_{}", i)); let context = env.service_context(format!("node_{}", i));
new_with_bootnode_via_enr(context, &boot_node, base_config.clone()) new_with_bootnode_via_enr(context, &boot_node, base_config.clone())
}) })
.collect::<Vec<_>>(); .collect::<Vec<_>>();
let validators = nodes
.iter()
.enumerate()
.map(|(i, node)| {
let mut context = env.service_context(format!("validator_{}", i));
// Pull the spec from the beacon node's beacon chain, in case there were some changes
// to the spec after the node booted.
context.eth2_config.spec = node
.client
.beacon_chain()
.expect("should have beacon chain")
.spec
.clone();
let indices =
(i * validators_per_node..(i + 1) * validators_per_node).collect::<Vec<_>>();
new_validator_client(
env.service_context(format!("validator_{}", i)),
node,
ValidatorConfig::default(),
&indices,
)
})
.collect::<Vec<_>>();
nodes.insert(0, boot_node);
env.block_until_ctrl_c()?; env.block_until_ctrl_c()?;
Ok(()) Ok(())
} }
// TODO: this function does not result in nodes connecting to each other. Age to investigate? // TODO: this function does not result in nodes connecting to each other. This is a bug due to
// using a 0 port for discovery. Age is fixing it.
fn new_with_bootnode_via_enr<E: EthSpec>( fn new_with_bootnode_via_enr<E: EthSpec>(
context: RuntimeContext<E>, context: RuntimeContext<E>,
boot_node: &BeaconNode<E>, boot_node: &BeaconNode<E>,
@@ -75,3 +108,22 @@ fn new_with_bootnode_via_multiaddr<E: EthSpec>(
BeaconNode::production(context, config) BeaconNode::production(context, config)
} }
fn new_validator_client<E: EthSpec>(
context: RuntimeContext<E>,
beacon_node: &BeaconNode<E>,
base_config: ValidatorConfig,
keypair_indices: &[usize],
) -> LocalValidatorClient<E> {
let mut config = base_config;
let (grpc_endpoint, grpc_port) = beacon_node
.client
.grpc_listen_addr()
.expect("Must have gRPC started");
config.server = grpc_endpoint;
config.server_grpc_port = grpc_port;
LocalValidatorClient::production_with_insecure_keypairs(context, config, keypair_indices)
}

View File

@@ -16,3 +16,4 @@ serde = "1.0"
futures = "0.1.25" futures = "0.1.25"
genesis = { path = "../../beacon_node/genesis" } genesis = { path = "../../beacon_node/genesis" }
remote_beacon_node = { path = "../../eth2/utils/remote_beacon_node" } remote_beacon_node = { path = "../../eth2/utils/remote_beacon_node" }
validator_client = { path = "../../validator_client" }

View File

@@ -3,13 +3,17 @@ use environment::RuntimeContext;
use futures::Future; use futures::Future;
use remote_beacon_node::RemoteBeaconNode; use remote_beacon_node::RemoteBeaconNode;
use std::path::PathBuf; use std::path::PathBuf;
use std::time::{SystemTime, UNIX_EPOCH};
use tempdir::TempDir; use tempdir::TempDir;
use types::EthSpec; use types::EthSpec;
use validator_client::{validator_directory::ValidatorDirectoryBuilder, ProductionValidatorClient};
pub use beacon_node::{ClientConfig, ProductionClient}; pub use beacon_node::{ClientConfig, ProductionClient};
pub use environment; pub use environment;
pub use validator_client::Config as ValidatorConfig;
/// Provides a beacon node that is running in the current process. Useful for testing purposes. /// Provides a beacon node that is running in the current process (i.e., local). Useful for testing
/// purposes.
pub struct LocalBeaconNode<T> { pub struct LocalBeaconNode<T> {
pub client: T, pub client: T,
pub datadir: TempDir, pub datadir: TempDir,
@@ -56,10 +60,67 @@ pub fn testing_client_config() -> ClientConfig {
client_config.rest_api.port = 0; client_config.rest_api.port = 0;
client_config.websocket_server.port = 0; client_config.websocket_server.port = 0;
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("should get system time")
.as_secs();
client_config.genesis = ClientGenesis::Interop { client_config.genesis = ClientGenesis::Interop {
validator_count: 8, validator_count: 8,
genesis_time: 13_371_337, genesis_time: now,
}; };
client_config client_config
} }
pub struct LocalValidatorClient<T: EthSpec> {
pub client: ProductionValidatorClient<T>,
pub datadir: TempDir,
}
impl<E: EthSpec> LocalValidatorClient<E> {
pub fn production_with_insecure_keypairs(
context: RuntimeContext<E>,
config: ValidatorConfig,
keypair_indices: &[usize],
) -> Self {
// Creates a temporary directory that will be deleted once this `TempDir` is dropped.
let datadir = TempDir::new("lighthouse-beacon-node")
.expect("should create temp directory for client datadir");
keypair_indices.iter().for_each(|i| {
ValidatorDirectoryBuilder::default()
.spec(context.eth2_config.spec.clone())
.full_deposit_amount()
.expect("should set full deposit amount")
.insecure_keypairs(*i)
.create_directory(PathBuf::from(datadir.path()))
.expect("should create directory")
.write_keypair_files()
.expect("should write keypair files")
.write_eth1_data_file()
.expect("should write eth1 data file")
.build()
.expect("should build dir");
});
Self::new(context, config, datadir)
}
pub fn production(context: RuntimeContext<E>, config: ValidatorConfig) -> Self {
// Creates a temporary directory that will be deleted once this `TempDir` is dropped.
let datadir = TempDir::new("lighthouse-validator")
.expect("should create temp directory for client datadir");
Self::new(context, config, datadir)
}
fn new(context: RuntimeContext<E>, mut config: ValidatorConfig, datadir: TempDir) -> Self {
config.data_dir = datadir.path().into();
let client =
ProductionValidatorClient::new(context, config).expect("should start validator client");
Self { client, datadir }
}
}

View File

@@ -43,20 +43,36 @@ pub struct ProductionValidatorClient<T: EthSpec> {
impl<T: EthSpec> ProductionValidatorClient<T> { impl<T: EthSpec> ProductionValidatorClient<T> {
/// Instantiates the validator client, _without_ starting the timers to trigger block /// Instantiates the validator client, _without_ starting the timers to trigger block
/// and attestation production. /// and attestation production.
pub fn new_from_cli(context: RuntimeContext<T>, matches: &ArgMatches) -> Result<Self, String> { pub fn new_from_cli(
mut context: RuntimeContext<T>,
matches: &ArgMatches,
) -> Result<Self, String> {
let mut log = context.log.clone(); let mut log = context.log.clone();
let (client_config, eth2_config) = get_configs(&matches, &mut log) let (config, eth2_config) = get_configs(&matches, &mut log)
.map_err(|e| format!("Unable to initialize config: {}", e))?; .map_err(|e| format!("Unable to initialize config: {}", e))?;
// TODO: the eth2 config in the env is being completely ignored.
//
// See https://github.com/sigp/lighthouse/issues/602
context.eth2_config = eth2_config;
Self::new(context, config)
}
/// Instantiates the validator client, _without_ starting the timers to trigger block
/// and attestation production.
pub fn new(context: RuntimeContext<T>, config: Config) -> Result<Self, String> {
let log = context.log.clone();
info!( info!(
log, log,
"Starting validator client"; "Starting validator client";
"datadir" => client_config.full_data_dir().expect("Unable to find datadir").to_str(), "datadir" => config.full_data_dir().expect("Unable to find datadir").to_str(),
); );
let service: Service<ValidatorServiceClient, Keypair, T> = let service: Service<ValidatorServiceClient, Keypair, T> =
Service::initialize_service(client_config, eth2_config, log.clone()) Service::initialize_service(config, context.eth2_config.clone(), log.clone())
.map_err(|e| e.to_string())?; .map_err(|e| e.to_string())?;
Ok(Self { Ok(Self {

View File

@@ -161,6 +161,12 @@ impl<E: EthSpec> Service<ValidatorServiceClient, Keypair, E> {
// Load generated keypairs // Load generated keypairs
let keypairs = Arc::new(client_config.fetch_keys(&log)?); let keypairs = Arc::new(client_config.fetch_keys(&log)?);
info!(
log,
"Keypairs loaded";
"local_validator_count" => keypairs.len()
);
let slots_per_epoch = E::slots_per_epoch(); let slots_per_epoch = E::slots_per_epoch();
// TODO: keypairs are randomly generated; they should be loaded from a file or generated. // TODO: keypairs are randomly generated; they should be loaded from a file or generated.