From 56d33d2e26866e6ebdd02835bb75b5bb3c5a2945 Mon Sep 17 00:00:00 2001 From: Age Manning Date: Sat, 23 Mar 2019 11:48:36 +1100 Subject: [PATCH] Basic tokio slot stream implementation --- validator_client/Cargo.toml | 4 +- validator_client/src/duties/mod.rs | 13 +- validator_client/src/service.rs | 291 ++++++++++++++++------------- 3 files changed, 170 insertions(+), 138 deletions(-) diff --git a/validator_client/Cargo.toml b/validator_client/Cargo.toml index 4bd63715c2..e8cff2622e 100644 --- a/validator_client/Cargo.toml +++ b/validator_client/Cargo.toml @@ -8,6 +8,7 @@ edition = "2018" block_proposer = { path = "../eth2/block_proposer" } attester = { path = "../eth2/attester" } bls = { path = "../eth2/utils/bls" } +ssz = { path = "../eth2/utils/ssz" } clap = "2.32.0" dirs = "1.0.3" grpcio = { version = "0.4", default-features = false, features = ["protobuf-codec"] } @@ -18,4 +19,5 @@ types = { path = "../eth2/types" } slog = "^2.2.3" slog-term = "^2.4.0" slog-async = "^2.3.0" -ssz = { path = "../eth2/utils/ssz" } +tokio = "0.1.18" +tokio-timer = "0.2.10" diff --git a/validator_client/src/duties/mod.rs b/validator_client/src/duties/mod.rs index 29bd81d0aa..c2b95b1c53 100644 --- a/validator_client/src/duties/mod.rs +++ b/validator_client/src/duties/mod.rs @@ -39,11 +39,11 @@ pub enum Error { /// A polling state machine which ensures the latest `EpochDuties` are obtained from the Beacon /// Node. /// -/// There is a single `DutiesManager` per validator instance. +/// This keeps track of all validator keys and required voting slots. pub struct DutiesManager { pub duties_map: Arc, - /// The validator's public key. - pub pubkey: PublicKey, + /// A list of all public keys known to the validator service. + pub pubkeys: Vec, pub spec: Arc, pub slot_clock: Arc, pub beacon_node: Arc, @@ -54,6 +54,8 @@ impl DutiesManager { /// /// The present `epoch` will be learned from the supplied `SlotClock`. In production this will /// be a wall-clock (e.g., system time, remote server time, etc.). + //TODO: Remove the poll and trust the tokio system-clock timer. Leave for now to ensure the + //timer is accurate. pub fn poll(&self) -> Result { let slot = self .slot_clock @@ -63,7 +65,10 @@ impl DutiesManager { let epoch = slot.epoch(self.spec.slots_per_epoch); - if let Some(duties) = self.beacon_node.request_shuffling(epoch, &self.pubkey)? { + if let Some(duties) = self + .beacon_node + .request_shuffling(epoch, &self.pubkeys[0])? + { // If these duties were known, check to see if they're updates or identical. let result = if let Some(known_duties) = self.duties_map.get(epoch)? { if known_duties == duties { diff --git a/validator_client/src/service.rs b/validator_client/src/service.rs index 0934a5a160..322c370b5e 100644 --- a/validator_client/src/service.rs +++ b/validator_client/src/service.rs @@ -8,17 +8,23 @@ use attester::{test_utils::LocalSigner as AttesterLocalSigner, Attester}; use block_proposer::{test_utils::LocalSigner as BlockProposerLocalSigner, BlockProducer}; use bls::Keypair; use grpcio::{ChannelBuilder, EnvBuilder}; -use protos::services::{Empty, NodeInfo}; +use protos::services::Empty; use protos::services_grpc::{ AttestationServiceClient, BeaconBlockServiceClient, BeaconNodeServiceClient, ValidatorServiceClient, }; -use slog::{info, o, warn, Drain}; -use slot_clock::SystemTimeSlotClock; +use slog::{debug, info, warn}; +use slot_clock::{SlotClock, SystemTimeSlotClock}; +use std::ops::Sub; use std::sync::Arc; -use std::thread; -use std::time::Duration; -use types::{Epoch, Fork}; +use std::time::{Duration, Instant, SystemTime}; +use tokio::prelude::*; +use tokio::runtime::Builder; +use tokio::timer::Interval; +use tokio_timer::clock::Clock; +use types::{Epoch, Fork, Slot}; + +//TODO: This service should be simplified in the future. Can be made more steamlined. /// The validator service. This is the main thread that executes and maintains validator /// duties. @@ -33,6 +39,8 @@ pub struct Service { slot_clock: Arc, /// The current slot we are processing. current_slot: Slot, + /// Seconds until the next slot. This is used for initializing the tokio timer interval. + seconds_to_next_slot: Duration, // GRPC Clients /// The beacon block GRPC client. beacon_block_client: Arc, @@ -45,15 +53,6 @@ pub struct Service { } impl Service { - /// Initialise the service then run the core thread. - pub fn start(config: ValidatorConfig, log: slog::Logger) { - // connect to the node and retrieve its properties and initialize the gRPC clients - let service = Service::initialize_service(&config, log); - - // we have connected to a node and established its parameters. Spin up the core service - service.run(config); - } - /// Initial connection to the beacon node to determine its properties. /// /// This tries to connect to a beacon node. Once connected, it initialised the gRPC clients @@ -124,7 +123,24 @@ impl Service { Arc::new(AttestationServiceClient::new(ch)) }; - let current_slot = slot_clock.present_slot().saturating_sub(1); + //TODO: Add error chain. Handle errors + let current_slot = slot_clock.present_slot().unwrap().unwrap().sub(1); + + // calculate seconds to the next slot + let seconds_to_next_slot = { + let syslot_time = SystemTime::now(); + let duration_since_epoch = syslot_time.duration_since(SystemTime::UNIX_EPOCH).unwrap(); + let mut secs_to_slot = None; + if let Some(duration_since_genesis) = + duration_since_epoch.checked_sub(Duration::from_secs(genesis_time)) + { + // seconds till next slot + secs_to_slot = duration_since_genesis + .as_secs() + .checked_rem(config.spec.seconds_per_slot); + } + secs_to_slot.unwrap_or_else(|| 0) + }; Self { connected_node_version: node_info.version, @@ -132,6 +148,7 @@ impl Service { fork, slot_clock, current_slot, + seconds_to_next_slot: Duration::from_secs(seconds_to_next_slot), beacon_block_client, validator_client, attester_client, @@ -139,132 +156,140 @@ impl Service { } } - fn run(&mut self, config: ValidatorConfig) { + /// Initialise the service then run the core thread. + pub fn start(config: ValidatorConfig, log: slog::Logger) { + // connect to the node and retrieve its properties and initialize the gRPC clients + let service = Service::initialize_service(&config, log); + + // we have connected to a node and established its parameters. Spin up the core service + + // set up the validator service runtime + let mut runtime = Builder::new() + .clock(Clock::system()) + .name_prefix("validator-client-") + .build() + .unwrap(); + + // set up the validator work interval - start at next slot and proceed every slot + // TODO: Error chain handle errors. + let interval = { + // Set the interval to start at the next slot, and every slot after + let slot_duration = Duration::from_secs(config.spec.seconds_per_slot); + //TODO: Handle checked add correctly + Interval::new(Instant::now() + service.seconds_to_next_slot, slot_duration) + }; + + // kick off core service // generate keypairs // TODO: keypairs are randomly generated; they should be loaded from a file or generated. // https://github.com/sigp/lighthouse/issues/160 - let keypairs = vec![Keypair::random()]; + let keypairs = Arc::new(vec![Keypair::random()]); - // set up the validator service runtime - let runtime = Builder::new().clock(Clock::system()).name_prefix("validator-client-").build().unwrap(); - - // set up the validator work interval - start at next slot and proceed every slot - let interval = { - let time_to_next_slot = { - let syslot_time = SystemTime::now(); - let duration_since_epoch = syslot_time.duration_since(SystemTime::UNIX_EPOCH)?; - let mut secs_to_slot = None; - if let Some(duration_since_genesis) = - duration_since_epoch.checked_sub(Duration::from_secs(self.genesis_seconds)) { - // seconds till next slot - secs_to_slot =duration_since_genesis.as_secs().checked_rem(config.spec.seconds_per_slot); - } - secs_to_slot.ok_or_else(0) - } - // Set the interval to start at the next slot, and every slot after - let slot_duration = Duration::from_secs(config.spec.seconds_per_slot); - //TODO: Handle checked add correctly - Interval::new(Instant::now().checked_add(secs_to_slot)?, slot_duration) - } - - // kick off core service - runtime.spawn(interval.for_each(|_| {})); - - - - let duties_map = Arc::new(EpochDutiesMap::new(spec.slots_per_epoch)); - let epoch_map_for_attester = Arc::new(EpochMap::new(spec.slots_per_epoch)); + // build requisite objects to pass to core thread. + let duties_map = Arc::new(EpochDutiesMap::new(config.spec.slots_per_epoch)); + let epoch_map_for_attester = Arc::new(EpochMap::new(config.spec.slots_per_epoch)); let manager = DutiesManager { duties_map, - pubkey, - spec, - slot_clock, - beacon_node, + pubkeys: keypairs.iter().map(|keypair| keypair.pk.clone()).collect(), + spec: Arc::new(config.spec), + slot_clock: service.slot_clock.clone(), + beacon_node: service.validator_client.clone(), }; - - for keypair in keypairs { - info!(self.log, "Starting validator services"; "validator" => keypair.pk.concatenated_hex_id()); - let duties_map = Arc::new(EpochDutiesMap::new(spec.slots_per_epoch)); - let epoch_map_for_attester = Arc::new(EpochMap::new(spec.slots_per_epoch)); - - // Spawn a new thread to maintain the validator's `EpochDuties`. - let duties_manager_thread = { - let spec = spec.clone(); - let duties_map = duties_map.clone(); - let slot_clock = self.slot_clock.clone(); - let log = self.log.clone(); - let beacon_node = self.validator_client.clone(); - let pubkey = keypair.pk.clone(); - thread::spawn(move || { - let manager = DutiesManager { - duties_map, - pubkey, - spec, - slot_clock, - beacon_node, - }; - let mut duties_manager_service = DutiesManagerService { - manager, - poll_interval_millis, - log, - }; - - duties_manager_service.run(); - }) - }; - - // Spawn a new thread to perform block production for the validator. - let producer_thread = { - let spec = spec.clone(); - let signer = Arc::new(BlockProposerLocalSigner::new(keypair.clone())); - let duties_map = duties_map.clone(); - let slot_clock = slot_clock.clone(); - let log = log.clone(); - let client = Arc::new(BeaconBlockGrpcClient::new(beacon_block_grpc_client.clone())); - thread::spawn(move || { - let block_producer = - BlockProducer::new(spec, duties_map, slot_clock, client, signer); - let mut block_producer_service = BlockProducerService { - block_producer, - poll_interval_millis, - log, - }; - - block_producer_service.run(); - }) - }; - - // Spawn a new thread for attestation for the validator. - let attester_thread = { - let signer = Arc::new(AttesterLocalSigner::new(keypair.clone())); - let epoch_map = epoch_map_for_attester.clone(); - let slot_clock = slot_clock.clone(); - let log = log.clone(); - let client = Arc::new(AttestationGrpcClient::new(attester_grpc_client.clone())); - thread::spawn(move || { - let attester = Attester::new(epoch_map, slot_clock, client, signer); - let mut attester_service = AttesterService { - attester, - poll_interval_millis, - log, - }; - - attester_service.run(); - }) - }; - - threads.push((duties_manager_thread, producer_thread, attester_thread)); - } - - // Naively wait for all the threads to complete. - for tuple in threads { - let (manager, producer, attester) = tuple; - let _ = producer.join(); - let _ = manager.join(); - let _ = attester.join(); - } + runtime.block_on(interval.for_each(move |_| { + // update duties + debug!(service.log, "Processing new slot..."); + manager.poll(); + Ok(()) + })); } + + /* + + let duties_map = Arc::new(EpochDutiesMap::new(spec.slots_per_epoch)); + let epoch_map_for_attester = Arc::new(EpochMap::new(spec.slots_per_epoch)); + + + for keypair in keypairs { + info!(self.log, "Starting validator services"; "validator" => keypair.pk.concatenated_hex_id()); + + // Spawn a new thread to maintain the validator's `EpochDuties`. + let duties_manager_thread = { + let spec = spec.clone(); + let duties_map = duties_map.clone(); + let slot_clock = self.slot_clock.clone(); + let log = self.log.clone(); + let beacon_node = self.validator_client.clone(); + let pubkey = keypair.pk.clone(); + thread::spawn(move || { + let manager = DutiesManager { + duties_map, + pubkey, + spec, + slot_clock, + beacon_node, + }; + let mut duties_manager_service = DutiesManagerService { + manager, + poll_interval_millis, + log, + }; + + duties_manager_service.run(); + }) + }; + + // Spawn a new thread to perform block production for the validator. + let producer_thread = { + let spec = spec.clone(); + let signer = Arc::new(BlockProposerLocalSigner::new(keypair.clone())); + let duties_map = duties_map.clone(); + let slot_clock = slot_clock.clone(); + let log = log.clone(); + let client = Arc::new(BeaconBlockGrpcClient::new(beacon_block_grpc_client.clone())); + thread::spawn(move || { + let block_producer = + BlockProducer::new(spec, duties_map, slot_clock, client, signer); + let mut block_producer_service = BlockProducerService { + block_producer, + poll_interval_millis, + log, + }; + + block_producer_service.run(); + }) + }; + + // Spawn a new thread for attestation for the validator. + let attester_thread = { + let signer = Arc::new(AttesterLocalSigner::new(keypair.clone())); + let epoch_map = epoch_map_for_attester.clone(); + let slot_clock = slot_clock.clone(); + let log = log.clone(); + let client = Arc::new(AttestationGrpcClient::new(attester_grpc_client.clone())); + thread::spawn(move || { + let attester = Attester::new(epoch_map, slot_clock, client, signer); + let mut attester_service = AttesterService { + attester, + poll_interval_millis, + log, + }; + + attester_service.run(); + }) + }; + + threads.push((duties_manager_thread, producer_thread, attester_thread)); + } + + // Naively wait for all the threads to complete. + for tuple in threads { + let (manager, producer, attester) = tuple; + let _ = producer.join(); + let _ = manager.join(); + let _ = attester.join(); + } + */ }