diff --git a/validator_client/Cargo.toml b/validator_client/Cargo.toml index cdde71774f..4bd63715c2 100644 --- a/validator_client/Cargo.toml +++ b/validator_client/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "validator_client" version = "0.1.0" -authors = ["Paul Hauner "] +authors = ["Paul Hauner ", "Age Manning "] edition = "2018" [dependencies] diff --git a/validator_client/src/config.rs b/validator_client/src/config.rs index 60edc564ad..0bf320b4f9 100644 --- a/validator_client/src/config.rs +++ b/validator_client/src/config.rs @@ -6,7 +6,7 @@ use types::ChainSpec; /// Stores the core configuration for this validator instance. #[derive(Clone)] -pub struct ClientConfig { +pub struct Config { pub data_dir: PathBuf, pub server: String, pub spec: ChainSpec, @@ -14,7 +14,7 @@ pub struct ClientConfig { const DEFAULT_LIGHTHOUSE_DIR: &str = ".lighthouse-validators"; -impl ClientConfig { +impl Config { /// Build a new configuration from defaults. pub fn default() -> Self { let data_dir = { @@ -33,7 +33,7 @@ impl ClientConfig { } pub fn parse_args(matches: ArgMatches, log: &slog::Logger) -> Result { - let mut config = ClientConfig::default(); + let mut config = Config::default(); // Custom datadir if let Some(dir) = matches.value_of("datadir") { config.data_dir = PathBuf::from(dir.to_string()); diff --git a/validator_client/src/main.rs b/validator_client/src/main.rs index c6cf586f3c..0ec392731b 100644 --- a/validator_client/src/main.rs +++ b/validator_client/src/main.rs @@ -1,27 +1,13 @@ -use crate::attester_service::{AttestationGrpcClient, AttesterService}; -use crate::block_producer_service::{BeaconBlockGrpcClient, BlockProducerService}; -use crate::config::ClientConfig; -use crate::duties::{DutiesManager, DutiesManagerService, EpochDutiesMap}; -use attester::test_utils::EpochMap; -use attester::{test_utils::LocalSigner as AttesterLocalSigner, Attester}; -use block_proposer::{test_utils::LocalSigner as BlockProposerLocalSigner, BlockProducer}; -use bls::Keypair; -use clap::{App, Arg}; -use grpcio::{ChannelBuilder, EnvBuilder}; -use protos::services::{Empty, NodeInfo}; -use protos::services_grpc::{ - AttestationServiceClient, BeaconBlockServiceClient, BeaconNodeServiceClient, - ValidatorServiceClient, -}; -use slog::{info, o, Drain}; -use slot_clock::SystemTimeSlotClock; -use std::sync::Arc; -use std::thread; - mod attester_service; mod block_producer_service; mod config; mod duties; +mod service; + +use crate::config::Config as ValidatorConfig; +use clap::{App, Arg}; +use service::Service as ValidatorService; +use slog::{o, Drain}; fn main() { // Logging @@ -61,142 +47,8 @@ fn main() { ) .get_matches(); - let config = ClientConfig::parse_args(matches, &log).unwrap(); + let config = ValidatorConfig::parse_args(matches, &log).unwrap(); - // Beacon node gRPC beacon node endpoints. - let beacon_node_grpc_client = { - let env = Arc::new(EnvBuilder::new().build()); - let ch = ChannelBuilder::new(env).connect(&config.server); - Arc::new(BeaconNodeServiceClient::new(ch)) - }; - - // Beacon node gRPC beacon block endpoints. - let beacon_block_grpc_client = { - let env = Arc::new(EnvBuilder::new().build()); - let ch = ChannelBuilder::new(env).connect(&config.server); - Arc::new(BeaconBlockServiceClient::new(ch)) - }; - - // Beacon node gRPC validator endpoints. - let validator_grpc_client = { - let env = Arc::new(EnvBuilder::new().build()); - let ch = ChannelBuilder::new(env).connect(&config.server); - Arc::new(ValidatorServiceClient::new(ch)) - }; - - //Beacon node gRPC attester endpoints. - let attester_grpc_client = { - let env = Arc::new(EnvBuilder::new().build()); - let ch = ChannelBuilder::new(env).connect(&config.server); - Arc::new(AttestationServiceClient::new(ch)) - }; - - // retrieve node information - let node_info = beacon_node_grpc_client.info(&Empty::new()); - - info!(log, "Beacon node info: {:?}", node_info); - - // Spec - let spec = Arc::new(config.spec.clone()); - - let genesis_time = 1_549_935_547; - let slot_clock = { - info!(log, "Genesis time"; "unix_epoch_seconds" => genesis_time); - let clock = SystemTimeSlotClock::new(genesis_time, spec.seconds_per_slot) - .expect("Unable to instantiate SystemTimeSlotClock."); - Arc::new(clock) - }; - - let poll_interval_millis = spec.seconds_per_slot * 1000 / 10; // 10% epoch time precision. - info!(log, "Starting block producer service"; "polls_per_epoch" => spec.seconds_per_slot * 1000 / poll_interval_millis); - - /* - * Start threads. - */ - let mut threads = vec![]; - // TODO: keypairs are randomly generated; they should be loaded from a file or generated. - // https://github.com/sigp/lighthouse/issues/160 - let keypairs = vec![Keypair::random()]; - - for keypair in keypairs { - info!(log, "Starting validator services"; "validator" => keypair.pk.concatenated_hex_id()); - let duties_map = Arc::new(EpochDutiesMap::new(spec.slots_per_epoch)); - let epoch_map_for_attester = Arc::new(EpochMap::new(spec.slots_per_epoch)); - - // Spawn a new thread to maintain the validator's `EpochDuties`. - let duties_manager_thread = { - let spec = spec.clone(); - let duties_map = duties_map.clone(); - let slot_clock = slot_clock.clone(); - let log = log.clone(); - let beacon_node = validator_grpc_client.clone(); - let pubkey = keypair.pk.clone(); - thread::spawn(move || { - let manager = DutiesManager { - duties_map, - pubkey, - spec, - slot_clock, - beacon_node, - }; - let mut duties_manager_service = DutiesManagerService { - manager, - poll_interval_millis, - log, - }; - - duties_manager_service.run(); - }) - }; - - // Spawn a new thread to perform block production for the validator. - let producer_thread = { - let spec = spec.clone(); - let signer = Arc::new(BlockProposerLocalSigner::new(keypair.clone())); - let duties_map = duties_map.clone(); - let slot_clock = slot_clock.clone(); - let log = log.clone(); - let client = Arc::new(BeaconBlockGrpcClient::new(beacon_block_grpc_client.clone())); - thread::spawn(move || { - let block_producer = - BlockProducer::new(spec, duties_map, slot_clock, client, signer); - let mut block_producer_service = BlockProducerService { - block_producer, - poll_interval_millis, - log, - }; - - block_producer_service.run(); - }) - }; - - // Spawn a new thread for attestation for the validator. - let attester_thread = { - let signer = Arc::new(AttesterLocalSigner::new(keypair.clone())); - let epoch_map = epoch_map_for_attester.clone(); - let slot_clock = slot_clock.clone(); - let log = log.clone(); - let client = Arc::new(AttestationGrpcClient::new(attester_grpc_client.clone())); - thread::spawn(move || { - let attester = Attester::new(epoch_map, slot_clock, client, signer); - let mut attester_service = AttesterService { - attester, - poll_interval_millis, - log, - }; - - attester_service.run(); - }) - }; - - threads.push((duties_manager_thread, producer_thread, attester_thread)); - } - - // Naively wait for all the threads to complete. - for tuple in threads { - let (manager, producer, attester) = tuple; - let _ = producer.join(); - let _ = manager.join(); - let _ = attester.join(); - } + // start the validator service. + ValidatorService::start(config, log); } diff --git a/validator_client/src/service.rs b/validator_client/src/service.rs new file mode 100644 index 0000000000..95f2a23614 --- /dev/null +++ b/validator_client/src/service.rs @@ -0,0 +1,166 @@ +/// The validator service. Connects to a beacon node and signs blocks when required. +use crate::attester_service::{AttestationGrpcClient, AttesterService}; +use crate::block_producer_service::{BeaconBlockGrpcClient, BlockProducerService}; +use crate::config::Config as ValidatorConfig; +use crate::duties::{DutiesManager, DutiesManagerService, EpochDutiesMap}; +use attester::test_utils::EpochMap; +use attester::{test_utils::LocalSigner as AttesterLocalSigner, Attester}; +use block_proposer::{test_utils::LocalSigner as BlockProposerLocalSigner, BlockProducer}; +use bls::Keypair; +use grpcio::{ChannelBuilder, EnvBuilder}; +use protos::services::{Empty, NodeInfo}; +use protos::services_grpc::{ + AttestationServiceClient, BeaconBlockServiceClient, BeaconNodeServiceClient, + ValidatorServiceClient, +}; +use slog::{info, o, Drain}; +use slot_clock::SystemTimeSlotClock; +use std::sync::Arc; +use std::thread; + +/// The validator service. This is the main thread that executes and maintains validator +/// duties. +pub struct Service {} + +impl Service { + pub fn start(config: ValidatorConfig, log: slog::Logger) { + // initialize the RPC clients + + let env = Arc::new(EnvBuilder::new().build()); + // Beacon node gRPC beacon node endpoints. + let beacon_node_grpc_client = { + let ch = ChannelBuilder::new(env.clone()).connect(&config.server); + Arc::new(BeaconNodeServiceClient::new(ch)) + }; + + // Beacon node gRPC beacon block endpoints. + let beacon_block_grpc_client = { + let ch = ChannelBuilder::new(env.clone()).connect(&config.server); + Arc::new(BeaconBlockServiceClient::new(ch)) + }; + + // Beacon node gRPC validator endpoints. + let validator_grpc_client = { + let ch = ChannelBuilder::new(env.clone()).connect(&config.server); + Arc::new(ValidatorServiceClient::new(ch)) + }; + + //Beacon node gRPC attester endpoints. + let attester_grpc_client = { + let ch = ChannelBuilder::new(env.clone()).connect(&config.server); + Arc::new(AttestationServiceClient::new(ch)) + }; + + // connect to the node and retrieve its properties + // node_info = connect_to_node(beacon_ndoe_grpc_client); + + // retrieve node information + let node_info = beacon_node_grpc_client.info(&Empty::new()); + + info!(log, "Beacon node info: {:?}", node_info); + + // Spec + let spec = Arc::new(config.spec.clone()); + + let genesis_time = 1_549_935_547; + let slot_clock = { + info!(log, "Genesis time"; "unix_epoch_seconds" => genesis_time); + let clock = SystemTimeSlotClock::new(genesis_time, spec.seconds_per_slot) + .expect("Unable to instantiate SystemTimeSlotClock."); + Arc::new(clock) + }; + + let poll_interval_millis = spec.seconds_per_slot * 1000 / 10; // 10% epoch time precision. + info!(log, "Starting block producer service"; "polls_per_epoch" => spec.seconds_per_slot * 1000 / poll_interval_millis); + + /* + * Start threads. + */ + let mut threads = vec![]; + // TODO: keypairs are randomly generated; they should be loaded from a file or generated. + // https://github.com/sigp/lighthouse/issues/160 + let keypairs = vec![Keypair::random()]; + + for keypair in keypairs { + info!(log, "Starting validator services"; "validator" => keypair.pk.concatenated_hex_id()); + let duties_map = Arc::new(EpochDutiesMap::new(spec.slots_per_epoch)); + let epoch_map_for_attester = Arc::new(EpochMap::new(spec.slots_per_epoch)); + + // Spawn a new thread to maintain the validator's `EpochDuties`. + let duties_manager_thread = { + let spec = spec.clone(); + let duties_map = duties_map.clone(); + let slot_clock = slot_clock.clone(); + let log = log.clone(); + let beacon_node = validator_grpc_client.clone(); + let pubkey = keypair.pk.clone(); + thread::spawn(move || { + let manager = DutiesManager { + duties_map, + pubkey, + spec, + slot_clock, + beacon_node, + }; + let mut duties_manager_service = DutiesManagerService { + manager, + poll_interval_millis, + log, + }; + + duties_manager_service.run(); + }) + }; + + // Spawn a new thread to perform block production for the validator. + let producer_thread = { + let spec = spec.clone(); + let signer = Arc::new(BlockProposerLocalSigner::new(keypair.clone())); + let duties_map = duties_map.clone(); + let slot_clock = slot_clock.clone(); + let log = log.clone(); + let client = Arc::new(BeaconBlockGrpcClient::new(beacon_block_grpc_client.clone())); + thread::spawn(move || { + let block_producer = + BlockProducer::new(spec, duties_map, slot_clock, client, signer); + let mut block_producer_service = BlockProducerService { + block_producer, + poll_interval_millis, + log, + }; + + block_producer_service.run(); + }) + }; + + // Spawn a new thread for attestation for the validator. + let attester_thread = { + let signer = Arc::new(AttesterLocalSigner::new(keypair.clone())); + let epoch_map = epoch_map_for_attester.clone(); + let slot_clock = slot_clock.clone(); + let log = log.clone(); + let client = Arc::new(AttestationGrpcClient::new(attester_grpc_client.clone())); + thread::spawn(move || { + let attester = Attester::new(epoch_map, slot_clock, client, signer); + let mut attester_service = AttesterService { + attester, + poll_interval_millis, + log, + }; + + attester_service.run(); + }) + }; + + threads.push((duties_manager_thread, producer_thread, attester_thread)); + } + + // Naively wait for all the threads to complete. + for tuple in threads { + let (manager, producer, attester) = tuple; + let _ = producer.join(); + let _ = manager.join(); + let _ = attester.join(); + } + } +}