diff --git a/beacon_node/client/src/lib.rs b/beacon_node/client/src/lib.rs index a033da87b6..44eab4fe20 100644 --- a/beacon_node/client/src/lib.rs +++ b/beacon_node/client/src/lib.rs @@ -64,8 +64,6 @@ impl Client { )); } - println!("Here"); - Ok(Client { config, beacon_chain, diff --git a/validator_client/Cargo.toml b/validator_client/Cargo.toml index e8cff2622e..ea97ef5d45 100644 --- a/validator_client/Cargo.toml +++ b/validator_client/Cargo.toml @@ -21,3 +21,4 @@ slog-term = "^2.4.0" slog-async = "^2.3.0" tokio = "0.1.18" tokio-timer = "0.2.10" +error-chain = "0.12.0" diff --git a/validator_client/src/error.rs b/validator_client/src/error.rs new file mode 100644 index 0000000000..29d7ba8829 --- /dev/null +++ b/validator_client/src/error.rs @@ -0,0 +1,22 @@ +use slot_clock; + +use error_chain::{ + error_chain, error_chain_processing, impl_error_chain_kind, impl_error_chain_processed, + impl_extract_backtrace, +}; + +error_chain! { + links { } + + errors { + SlotClockError(e: slot_clock::SystemTimeSlotClockError) { + description("Error reading system time"), + display("SlotClockError: '{:?}'", e) + } + + SystemTimeError(t: String ) { + description("Error reading system time"), + display("SystemTimeError: '{}'", t) + } + } +} diff --git a/validator_client/src/main.rs b/validator_client/src/main.rs index 0ec392731b..127df84942 100644 --- a/validator_client/src/main.rs +++ b/validator_client/src/main.rs @@ -2,12 +2,13 @@ mod attester_service; mod block_producer_service; mod config; mod duties; +pub mod error; mod service; use crate::config::Config as ValidatorConfig; use clap::{App, Arg}; use service::Service as ValidatorService; -use slog::{o, Drain}; +use slog::{error, info, o, Drain}; fn main() { // Logging @@ -50,5 +51,8 @@ fn main() { let config = ValidatorConfig::parse_args(matches, &log).unwrap(); // start the validator service. - ValidatorService::start(config, log); + match ValidatorService::start(config, log.clone()) { + Ok(_) => info!(log, "Validator client shutdown successfully."), + Err(e) => error!(log, "Validator exited due to {:?}", e), + } } diff --git a/validator_client/src/service.rs b/validator_client/src/service.rs index c88db29b85..9eeb308db9 100644 --- a/validator_client/src/service.rs +++ b/validator_client/src/service.rs @@ -3,6 +3,8 @@ use crate::attester_service::{AttestationGrpcClient, AttesterService}; use crate::block_producer_service::{BeaconBlockGrpcClient, BlockProducerService}; use crate::config::Config as ValidatorConfig; use crate::duties::{DutiesManager, DutiesManagerService, EpochDutiesMap}; +use crate::error as error_chain; +use crate::error::ErrorKind; use attester::test_utils::EpochMap; use attester::{test_utils::LocalSigner as AttesterLocalSigner, Attester}; use block_proposer::{test_utils::LocalSigner as BlockProposerLocalSigner, BlockProducer}; @@ -13,9 +15,8 @@ use protos::services_grpc::{ AttestationServiceClient, BeaconBlockServiceClient, BeaconNodeServiceClient, ValidatorServiceClient, }; -use slog::{debug, info, warn}; +use slog::{debug, error, info, warn}; use slot_clock::{SlotClock, SystemTimeSlotClock}; -use std::ops::Sub; use std::sync::Arc; use std::time::{Duration, Instant, SystemTime}; use tokio::prelude::*; @@ -57,7 +58,10 @@ impl Service { /// /// This tries to connect to a beacon node. Once connected, it initialised the gRPC clients /// and returns an instance of the service. - fn initialize_service(config: &ValidatorConfig, log: slog::Logger) -> Self { + fn initialize_service( + config: &ValidatorConfig, + log: slog::Logger, + ) -> error_chain::Result { // initialise the beacon node client to check for a connection let env = Arc::new(EnvBuilder::new().build()); @@ -86,7 +90,7 @@ impl Service { log, "Beacon Node's genesis time is in the future. No work to do.\n Exiting" ); - // return Err("Genesis Time in the future"); + return Err("Genesis time in the future".into()); } break info; } @@ -136,35 +140,37 @@ impl Service { Arc::new(AttestationServiceClient::new(ch)) }; - //TODO: Add error chain. Handle errors - let current_slot = slot_clock.present_slot().unwrap().unwrap().sub(1); + let current_slot = slot_clock + .present_slot() + .map_err(|e| ErrorKind::SlotClockError(e))? + .expect("Genesis must be in the future"); // calculate the duration to the next slot let duration_to_next_slot = { + let seconds_per_slot = config.spec.seconds_per_slot; let syslot_time = SystemTime::now(); - let duration_since_epoch = syslot_time.duration_since(SystemTime::UNIX_EPOCH).unwrap(); - let mut duration_to_next_slot = None; - if let Some(duration_since_genesis) = - duration_since_epoch.checked_sub(Duration::from_secs(genesis_time)) - { - let elapsed_slots = duration_since_epoch - .as_secs() - .checked_div(config.spec.seconds_per_slot as u64) - .unwrap(); - duration_to_next_slot = Some( - Duration::from_secs( - (elapsed_slots + 1) - .checked_mul(config.spec.seconds_per_slot) - .unwrap(), - ) - .checked_sub(duration_since_genesis) - .expect("This should never saturate"), - ); - } - duration_to_next_slot.unwrap_or_else(|| Duration::from_secs(0)) + let duration_since_epoch = syslot_time + .duration_since(SystemTime::UNIX_EPOCH) + .map_err(|e| ErrorKind::SystemTimeError(e.to_string()))?; + let duration_since_genesis = duration_since_epoch + .checked_sub(Duration::from_secs(genesis_time)) + .expect("Genesis must be in the future. Checked on connection"); + let elapsed_slots = duration_since_epoch + .as_secs() + .checked_div(seconds_per_slot as u64) + .expect("Seconds per slot should not be 0"); + + // the duration to the next slot + Duration::from_secs( + (elapsed_slots + 1) + .checked_mul(seconds_per_slot) + .expect("Next slot time should not overflow u64"), + ) + .checked_sub(duration_since_genesis) + .expect("This should never saturate") }; - Self { + Ok(Self { connected_node_version: node_info.version, chain_id: node_info.chain_id as u16, fork, @@ -175,13 +181,13 @@ impl Service { validator_client, attester_client, log, - } + }) } /// Initialise the service then run the core thread. - pub fn start(config: ValidatorConfig, log: slog::Logger) { + pub fn start(config: ValidatorConfig, log: slog::Logger) -> error_chain::Result<()> { // connect to the node and retrieve its properties and initialize the gRPC clients - let service = Service::initialize_service(&config, log); + let service = Service::initialize_service(&config, log)?; // we have connected to a node and established its parameters. Spin up the core service @@ -190,10 +196,9 @@ impl Service { .clock(Clock::system()) .name_prefix("validator-client-") .build() - .unwrap(); + .map_err(|e| format!("Tokio runtime failed: {}", e))?; // set up the validator work interval - start at next slot and proceed every slot - // TODO: Error chain handle errors. let interval = { // Set the interval to start at the next slot, and every slot after let slot_duration = Duration::from_secs(config.spec.seconds_per_slot); @@ -223,16 +228,23 @@ impl Service { beacon_node: service.validator_client.clone(), }; - runtime.block_on(interval.for_each(move |_| { - // update duties - debug!( - service.log, - "Processing slot: {}", - service.slot_clock.present_slot().unwrap().unwrap().as_u64() - ); - manager.poll(); - Ok(()) - })); + runtime + .block_on(interval.for_each(move |_| { + // update duties + let current_slot = match service.slot_clock.present_slot() { + Err(e) => { + error!(service.log, "SystemTimeError {:?}", e); + return Ok(()); + } + Ok(slot) => slot.expect("Genesis is in the future"), + }; + + debug!(service.log, "Processing slot: {}", current_slot.as_u64()); + manager.poll(); + Ok(()) + })) + .map_err(|e| format!("Service thread failed: {:?}", e))?; + Ok(()) } /*