From 114067bb50a30266b2150813a2746ebcb6d02b9d Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Fri, 22 Nov 2019 01:22:05 +1100 Subject: [PATCH] Start heavy refactor of validator client - Block production is working --- account_manager/src/lib.rs | 2 +- beacon_node/rest_api/src/beacon.rs | 8 + beacon_node/rest_api/src/node.rs | 8 - beacon_node/rest_api/src/router.rs | 6 +- beacon_node/rest_api/src/validator.rs | 23 +- beacon_node/rest_api/tests/test.rs | 122 ++++--- beacon_node/tests/test.rs | 6 +- eth2/utils/remote_beacon_node/Cargo.toml | 1 + eth2/utils/remote_beacon_node/src/lib.rs | 44 ++- eth2/utils/slot_clock/src/lib.rs | 3 + .../slot_clock/src/system_time_slot_clock.rs | 29 ++ .../slot_clock/src/testing_slot_clock.rs | 5 + lighthouse/src/main.rs | 7 +- tests/node_test_rig/src/lib.rs | 5 +- validator_client/Cargo.toml | 1 + validator_client/src/block_service.rs | 245 ++++++++++++++ validator_client/src/duties_service.rs | 307 ++++++++++++++++++ validator_client/src/fork_service.rs | 157 +++++++++ validator_client/src/lib.rs | 227 ++++++++----- validator_client/src/validator_store.rs | 109 +++++++ 20 files changed, 1165 insertions(+), 150 deletions(-) create mode 100644 validator_client/src/block_service.rs create mode 100644 validator_client/src/duties_service.rs create mode 100644 validator_client/src/fork_service.rs create mode 100644 validator_client/src/validator_store.rs diff --git a/account_manager/src/lib.rs b/account_manager/src/lib.rs index 7c19e953c0..0d0db31edd 100644 --- a/account_manager/src/lib.rs +++ b/account_manager/src/lib.rs @@ -37,7 +37,7 @@ fn run_account_manager( } }; default_dir.push(".lighthouse"); - default_dir.push("validator"); + default_dir.push("validators"); default_dir }); diff --git a/beacon_node/rest_api/src/beacon.rs b/beacon_node/rest_api/src/beacon.rs index 1857336601..2a95eba5de 100644 --- a/beacon_node/rest_api/src/beacon.rs +++ b/beacon_node/rest_api/src/beacon.rs @@ -249,3 +249,11 @@ pub fn get_genesis_state( ResponseBuilder::new(&req)?.body(&state) } + +/// Read the genesis time from the current beacon chain state. +pub fn get_genesis_time( + req: Request, + beacon_chain: Arc>, +) -> ApiResult { + ResponseBuilder::new(&req)?.body(&beacon_chain.head().beacon_state.genesis_time) +} diff --git a/beacon_node/rest_api/src/node.rs b/beacon_node/rest_api/src/node.rs index bce99e50c6..cf9059ad7d 100644 --- a/beacon_node/rest_api/src/node.rs +++ b/beacon_node/rest_api/src/node.rs @@ -9,11 +9,3 @@ use version; pub fn get_version(req: Request) -> ApiResult { ResponseBuilder::new(&req)?.body_no_ssz(&version::version()) } - -/// Read the genesis time from the current beacon chain state. -pub fn get_genesis_time( - req: Request, - beacon_chain: Arc>, -) -> ApiResult { - ResponseBuilder::new(&req)?.body(&beacon_chain.head().beacon_state.genesis_time) -} diff --git a/beacon_node/rest_api/src/router.rs b/beacon_node/rest_api/src/router.rs index 9f323fbe3f..f0dc4e2c5a 100644 --- a/beacon_node/rest_api/src/router.rs +++ b/beacon_node/rest_api/src/router.rs @@ -38,9 +38,6 @@ pub fn route( match (req.method(), path.as_ref()) { // Methods for Client (&Method::GET, "/node/version") => into_boxfut(node::get_version(req)), - (&Method::GET, "/node/genesis_time") => { - into_boxfut(node::get_genesis_time::(req, beacon_chain)) - } (&Method::GET, "/node/syncing") => { into_boxfut(helpers::implementation_pending_response(req)) } @@ -83,6 +80,9 @@ pub fn route( (&Method::GET, "/beacon/attestations/pending") => { into_boxfut(helpers::implementation_pending_response(req)) } + (&Method::GET, "/beacon/genesis_time") => { + into_boxfut(beacon::get_genesis_time::(req, beacon_chain)) + } (&Method::GET, "/beacon/validators") => { into_boxfut(beacon::get_validators::(req, beacon_chain)) diff --git a/beacon_node/rest_api/src/validator.rs b/beacon_node/rest_api/src/validator.rs index 0d40c9c06b..0d2a982b17 100644 --- a/beacon_node/rest_api/src/validator.rs +++ b/beacon_node/rest_api/src/validator.rs @@ -17,7 +17,7 @@ use std::sync::Arc; use types::beacon_state::EthSpec; use types::{Attestation, BeaconBlock, CommitteeIndex, RelativeEpoch, Slot}; -#[derive(Debug, Serialize, Deserialize, Clone)] +#[derive(PartialEq, Debug, Serialize, Deserialize, Clone)] pub struct ValidatorDuty { /// The validator's BLS public key, uniquely identifying them. _48-bytes, hex encoded with 0x prefix, case insensitive._ pub validator_pubkey: PublicKey, @@ -45,19 +45,23 @@ pub fn get_validator_duties( let epoch = query.epoch()?; - let mut head_state = beacon_chain.head().beacon_state; + let mut state = beacon_chain + .state_at_slot(epoch.start_slot(T::EthSpec::slots_per_epoch())) + .map_err(|e| { + ApiError::ServerError(format!("Unable to load state for epoch {}: {:?}", epoch, e)) + })?; - let current_epoch = head_state.current_epoch(); + let current_epoch = state.current_epoch(); let relative_epoch = RelativeEpoch::from_epoch(current_epoch, epoch).map_err(|_| { ApiError::BadRequest(format!( "Epoch must be within one epoch of the current epoch", )) })?; - head_state + state .build_committee_cache(relative_epoch, &beacon_chain.spec) .map_err(|e| ApiError::ServerError(format!("Unable to build committee cache: {:?}", e)))?; - head_state + state .update_pubkey_cache() .map_err(|e| ApiError::ServerError(format!("Unable to build pubkey cache: {:?}", e)))?; @@ -67,7 +71,7 @@ pub fn get_validator_duties( let validator_proposers: Vec<(usize, Slot)> = epoch .slot_iter(T::EthSpec::slots_per_epoch()) .map(|slot| { - head_state + state .get_beacon_proposer_index(slot, &beacon_chain.spec) .map(|i| (i, slot)) .map_err(|e| { @@ -84,13 +88,12 @@ pub fn get_validator_duties( .iter() .map(|validator_pubkey_str| { parse_pubkey(validator_pubkey_str).and_then(|validator_pubkey| { - if let Some(validator_index) = head_state - .get_validator_index(&validator_pubkey) - .map_err(|e| { + if let Some(validator_index) = + state.get_validator_index(&validator_pubkey).map_err(|e| { ApiError::ServerError(format!("Unable to read pubkey cache: {:?}", e)) })? { - let duties = head_state + let duties = state .get_attestation_duties(validator_index, relative_epoch) .map_err(|e| { ApiError::ServerError(format!( diff --git a/beacon_node/rest_api/tests/test.rs b/beacon_node/rest_api/tests/test.rs index fee986621e..38c52fd807 100644 --- a/beacon_node/rest_api/tests/test.rs +++ b/beacon_node/rest_api/tests/test.rs @@ -3,7 +3,7 @@ use beacon_chain::{BeaconChain, BeaconChainTypes}; use node_test_rig::{ environment::{Environment, EnvironmentBuilder}, - LocalBeaconNode, + testing_client_config, ClientGenesis, LocalBeaconNode, }; use remote_beacon_node::PublishStatus; use std::sync::Arc; @@ -66,7 +66,7 @@ fn validator_produce_attestation() { let spec = &E::default_spec(); - let node = LocalBeaconNode::production(env.core_context()); + let node = LocalBeaconNode::production(env.core_context(), testing_client_config()); let remote_node = node.remote_node().expect("should produce remote node"); let beacon_chain = node @@ -75,36 +75,6 @@ fn validator_produce_attestation() { .expect("client should have beacon chain"); let state = beacon_chain.head().beacon_state.clone(); - /* - // Publish a block so that we're not attesting to the genesis block. - { - let slot = Slot::new(1); - let randao_reveal = get_randao_reveal(beacon_chain.clone(), slot, spec); - - let mut block = env - .runtime() - .block_on( - remote_node - .http - .validator() - .produce_block(slot, randao_reveal.clone()), - ) - .expect("should fetch block from http api"); - - sign_block(beacon_chain.clone(), &mut block, spec); - - let publish_status = env - .runtime() - .block_on(remote_node.http.validator().publish_block(block.clone())) - .expect("should publish block"); - assert_eq!( - publish_status, - PublishStatus::Valid, - "should have published block" - ); - } - */ - let validator_index = 0; let duties = state .get_attestation_duties(validator_index, RelativeEpoch::Current) @@ -195,7 +165,7 @@ fn validator_duties() { let spec = &E::default_spec(); - let node = LocalBeaconNode::production(env.core_context()); + let node = LocalBeaconNode::production(env.core_context(), testing_client_config()); let remote_node = node.remote_node().expect("should produce remote node"); let beacon_chain = node @@ -282,7 +252,13 @@ fn validator_block_post() { let spec = &E::default_spec(); - let node = LocalBeaconNode::production(env.core_context()); + let mut config = testing_client_config(); + config.genesis = ClientGenesis::Interop { + validator_count: 8, + genesis_time: 13_371_337, + }; + + let node = LocalBeaconNode::production(env.core_context(), config); let remote_node = node.remote_node().expect("should produce remote node"); let beacon_chain = node @@ -343,7 +319,7 @@ fn validator_block_get() { let spec = &E::default_spec(); - let node = LocalBeaconNode::production(env.core_context()); + let node = LocalBeaconNode::production(env.core_context(), testing_client_config()); let remote_node = node.remote_node().expect("should produce remote node"); let beacon_chain = node @@ -381,7 +357,7 @@ fn validator_block_get() { fn beacon_state() { let mut env = build_env(); - let node = LocalBeaconNode::production(env.core_context()); + let node = LocalBeaconNode::production(env.core_context(), testing_client_config()); let remote_node = node.remote_node().expect("should produce remote node"); let (state_by_slot, root) = env @@ -425,7 +401,7 @@ fn beacon_state() { fn beacon_block() { let mut env = build_env(); - let node = LocalBeaconNode::production(env.core_context()); + let node = LocalBeaconNode::production(env.core_context(), testing_client_config()); let remote_node = node.remote_node().expect("should produce remote node"); let (block_by_slot, root) = env @@ -464,3 +440,75 @@ fn beacon_block() { "genesis block by root from api should match that from the DB" ); } + +#[test] +fn genesis_time() { + let mut env = build_env(); + + let node = LocalBeaconNode::production(env.core_context(), testing_client_config()); + let remote_node = node.remote_node().expect("should produce remote node"); + + let genesis_time = env + .runtime() + .block_on(remote_node.http.beacon().get_genesis_time()) + .expect("should fetch genesis time from http api"); + + assert_eq!( + node.client + .beacon_chain() + .expect("should have beacon chain") + .head() + .beacon_state + .genesis_time, + genesis_time, + "should match genesis time from head state" + ); +} + +#[test] +fn fork() { + let mut env = build_env(); + + let node = LocalBeaconNode::production(env.core_context(), testing_client_config()); + let remote_node = node.remote_node().expect("should produce remote node"); + + let fork = env + .runtime() + .block_on(remote_node.http.beacon().get_fork()) + .expect("should fetch from http api"); + + assert_eq!( + node.client + .beacon_chain() + .expect("should have beacon chain") + .head() + .beacon_state + .fork, + fork, + "should match head state" + ); +} + +#[test] +fn eth2_config() { + let mut env = build_env(); + + let node = LocalBeaconNode::production(env.core_context(), testing_client_config()); + let remote_node = node.remote_node().expect("should produce remote node"); + + let eth2_config = env + .runtime() + .block_on(remote_node.http.spec().get_eth2_config()) + .expect("should fetch eth2 config from http api"); + + // TODO: check the entire eth2_config, not just the spec. + + assert_eq!( + node.client + .beacon_chain() + .expect("should have beacon chain") + .spec, + eth2_config.spec, + "should match genesis time from head state" + ); +} diff --git a/beacon_node/tests/test.rs b/beacon_node/tests/test.rs index 4492c2f882..39a0fd091b 100644 --- a/beacon_node/tests/test.rs +++ b/beacon_node/tests/test.rs @@ -1,6 +1,6 @@ #![cfg(test)] -use node_test_rig::{environment::EnvironmentBuilder, LocalBeaconNode}; +use node_test_rig::{environment::EnvironmentBuilder, testing_client_config, LocalBeaconNode}; use types::{MinimalEthSpec, Slot}; fn env_builder() -> EnvironmentBuilder { @@ -17,12 +17,12 @@ fn http_server_genesis_state() { .build() .expect("environment should build"); - let node = LocalBeaconNode::production(env.core_context()); + let node = LocalBeaconNode::production(env.core_context(), testing_client_config()); let remote_node = node.remote_node().expect("should produce remote node"); let (api_state, _root) = env .runtime() - .block_on(remote_node.http.beacon().state_at_slot(Slot::new(0))) + .block_on(remote_node.http.beacon().state_by_slot(Slot::new(0))) .expect("should fetch state from http api"); let mut db_state = node diff --git a/eth2/utils/remote_beacon_node/Cargo.toml b/eth2/utils/remote_beacon_node/Cargo.toml index 9cc0649ee5..68a5e5df0d 100644 --- a/eth2/utils/remote_beacon_node/Cargo.toml +++ b/eth2/utils/remote_beacon_node/Cargo.toml @@ -16,3 +16,4 @@ rest_api = { path = "../../../beacon_node/rest_api" } hex = "0.3" eth2_ssz = { path = "../../../eth2/utils/ssz" } serde_json = "^1.0" +eth2_config = { path = "../../../eth2/utils/eth2_config" } diff --git a/eth2/utils/remote_beacon_node/src/lib.rs b/eth2/utils/remote_beacon_node/src/lib.rs index 92904c268f..369de9159e 100644 --- a/eth2/utils/remote_beacon_node/src/lib.rs +++ b/eth2/utils/remote_beacon_node/src/lib.rs @@ -3,6 +3,7 @@ //! //! Presently, this is only used for testing but it _could_ become a user-facing library. +use eth2_config::Eth2Config; use futures::{future, Future, IntoFuture}; use reqwest::{ r#async::{Client, ClientBuilder, Response}, @@ -14,8 +15,8 @@ use std::marker::PhantomData; use std::net::SocketAddr; use std::time::Duration; use types::{ - Attestation, BeaconBlock, BeaconState, CommitteeIndex, Epoch, EthSpec, Hash256, PublicKey, - Signature, Slot, + Attestation, BeaconBlock, BeaconState, CommitteeIndex, Epoch, EthSpec, Fork, Hash256, + PublicKey, Signature, Slot, }; use url::Url; @@ -23,6 +24,7 @@ pub use rest_api::{HeadResponse, ValidatorDuty}; pub const REQUEST_TIMEOUT_SECONDS: u64 = 5; +#[derive(Clone)] /// Connects to a remote Lighthouse (or compatible) node via HTTP. pub struct RemoteBeaconNode { pub http: HttpClient, @@ -81,6 +83,10 @@ impl HttpClient { Validator(self.clone()) } + pub fn spec(&self) -> Spec { + Spec(self.clone()) + } + fn url(&self, path: &str) -> Result { self.url.join(path).map_err(|e| e.into()) } @@ -284,6 +290,20 @@ impl Beacon { .map_err(Into::into) } + pub fn get_genesis_time(&self) -> impl Future { + let client = self.0.clone(); + self.url("genesis_time") + .into_future() + .and_then(move |url| client.json_get(url, vec![])) + } + + pub fn get_fork(&self) -> impl Future { + let client = self.0.clone(); + self.url("fork") + .into_future() + .and_then(move |url| client.json_get(url, vec![])) + } + pub fn get_head(&self) -> impl Future { let client = self.0.clone(); self.url("head") @@ -354,6 +374,26 @@ impl Beacon { } } +/// Provides the functions on the `/spec` endpoint of the node. +#[derive(Clone)] +pub struct Spec(HttpClient); + +impl Spec { + fn url(&self, path: &str) -> Result { + self.0 + .url("spec/") + .and_then(move |url| url.join(path).map_err(Error::from)) + .map_err(Into::into) + } + + pub fn get_eth2_config(&self) -> impl Future { + let client = self.0.clone(); + self.url("eth2_config") + .into_future() + .and_then(move |url| client.json_get(url, vec![])) + } +} + #[derive(Deserialize)] #[serde(bound = "T: EthSpec")] pub struct BlockResponse { diff --git a/eth2/utils/slot_clock/src/lib.rs b/eth2/utils/slot_clock/src/lib.rs index d31a1dc82e..b85b5a72c7 100644 --- a/eth2/utils/slot_clock/src/lib.rs +++ b/eth2/utils/slot_clock/src/lib.rs @@ -28,4 +28,7 @@ pub trait SlotClock: Send + Sync + Sized { /// Returns the duration until the next slot. fn duration_to_next_slot(&self) -> Option; + + /// Returns the duration until the first slot of the next epoch. + fn duration_to_next_epoch(&self, slots_per_epoch: u64) -> Option; } diff --git a/eth2/utils/slot_clock/src/system_time_slot_clock.rs b/eth2/utils/slot_clock/src/system_time_slot_clock.rs index 23159e79d7..600a419bd2 100644 --- a/eth2/utils/slot_clock/src/system_time_slot_clock.rs +++ b/eth2/utils/slot_clock/src/system_time_slot_clock.rs @@ -65,6 +65,35 @@ impl SlotClock for SystemTimeSlotClock { } } + fn duration_to_next_epoch(&self, slots_per_epoch: u64) -> Option { + let now = SystemTime::now().duration_since(UNIX_EPOCH).ok()?; + let genesis = self.genesis_duration; + + let slot_start = |slot: Slot| -> Duration { + let slot = slot.as_u64() as u32; + genesis + slot * self.slot_duration + }; + + let slot = self + .now() + .map(|slot| slot.epoch(slots_per_epoch)) + .map(|epoch| (epoch + 1).start_slot(slots_per_epoch))?; + + if now >= genesis { + Some( + slot_start(self.now()? + 1) + .checked_sub(now) + .expect("The next epoch cannot start before now"), + ) + } else { + Some( + genesis + .checked_sub(now) + .expect("Control flow ensures genesis is greater than or equal to now"), + ) + } + } + fn slot_duration(&self) -> Duration { self.slot_duration } diff --git a/eth2/utils/slot_clock/src/testing_slot_clock.rs b/eth2/utils/slot_clock/src/testing_slot_clock.rs index 0697ec2bcd..88eb2eec28 100644 --- a/eth2/utils/slot_clock/src/testing_slot_clock.rs +++ b/eth2/utils/slot_clock/src/testing_slot_clock.rs @@ -37,6 +37,11 @@ impl SlotClock for TestingSlotClock { Some(Duration::from_secs(1)) } + /// Always returns a duration of `1 * slots_per_epoch` second. + fn duration_to_next_epoch(&self, slots_per_epoch: u64) -> Option { + Some(Duration::from_secs(1 + slots_per_epoch)) + } + /// Always returns a slot duration of 0 seconds. fn slot_duration(&self) -> Duration { Duration::from_secs(0) diff --git a/lighthouse/src/main.rs b/lighthouse/src/main.rs index 74c261840c..a24aba38a1 100644 --- a/lighthouse/src/main.rs +++ b/lighthouse/src/main.rs @@ -153,7 +153,12 @@ fn run( { let runtime_context = environment.core_context(); - let validator = ProductionValidatorClient::new_from_cli(runtime_context, sub_matches) + let validator = environment + .runtime() + .block_on(ProductionValidatorClient::new_from_cli( + runtime_context, + sub_matches, + )) .map_err(|e| format!("Failed to init validator client: {}", e))?; validator diff --git a/tests/node_test_rig/src/lib.rs b/tests/node_test_rig/src/lib.rs index 2b263cf90b..0c3565a161 100644 --- a/tests/node_test_rig/src/lib.rs +++ b/tests/node_test_rig/src/lib.rs @@ -121,8 +121,9 @@ impl LocalValidatorClient { fn new(context: RuntimeContext, mut config: ValidatorConfig, datadir: TempDir) -> Self { config.data_dir = datadir.path().into(); - let client = - ProductionValidatorClient::new(context, config).expect("should start validator client"); + let client = ProductionValidatorClient::new(context, config) + .wait() + .expect("should start validator client"); client .start_service() diff --git a/validator_client/Cargo.toml b/validator_client/Cargo.toml index dda3b6d1db..6dcaa42e88 100644 --- a/validator_client/Cargo.toml +++ b/validator_client/Cargo.toml @@ -42,3 +42,4 @@ eth2_ssz_derive = { path = "../eth2/utils/ssz_derive" } hex = "0.4" deposit_contract = { path = "../eth2/utils/deposit_contract" } bls = { path = "../eth2/utils/bls" } +remote_beacon_node = { path = "../eth2/utils/remote_beacon_node" } diff --git a/validator_client/src/block_service.rs b/validator_client/src/block_service.rs new file mode 100644 index 0000000000..a6e91d4e45 --- /dev/null +++ b/validator_client/src/block_service.rs @@ -0,0 +1,245 @@ +use crate::{ + duties_service::DutiesService, fork_service::ForkService, validator_store::ValidatorStore, +}; +use environment::RuntimeContext; +use exit_future::Signal; +use futures::{stream, Future, IntoFuture, Stream}; +use remote_beacon_node::{PublishStatus, RemoteBeaconNode}; +use slog::{error, info, trace, warn}; +use slot_clock::SlotClock; +use std::sync::Arc; +use std::time::{Duration, Instant}; +use tokio::timer::Interval; +use types::{ChainSpec, EthSpec}; + +/// Delay this period of time after the slot starts. This allows the node to process the new slot. +const TIME_DELAY_FROM_SLOT: Duration = Duration::from_millis(100); + +#[derive(Clone)] +pub struct BlockServiceBuilder { + fork_service: Option>, + duties_service: Option>, + validator_store: Option>, + slot_clock: Option>, + beacon_node: Option>, + context: Option>, +} + +// TODO: clean trait bounds. +impl BlockServiceBuilder { + pub fn new() -> Self { + Self { + fork_service: None, + duties_service: None, + validator_store: None, + slot_clock: None, + beacon_node: None, + context: None, + } + } + + pub fn fork_service(mut self, service: ForkService) -> Self { + self.fork_service = Some(service); + self + } + + pub fn duties_service(mut self, service: DutiesService) -> Self { + self.duties_service = Some(service); + self + } + + pub fn validator_store(mut self, store: ValidatorStore) -> Self { + self.validator_store = Some(store); + self + } + + pub fn slot_clock(mut self, slot_clock: T) -> Self { + self.slot_clock = Some(Arc::new(slot_clock)); + self + } + + pub fn beacon_node(mut self, beacon_node: RemoteBeaconNode) -> Self { + self.beacon_node = Some(beacon_node); + self + } + + pub fn runtime_context(mut self, context: RuntimeContext) -> Self { + self.context = Some(context); + self + } + + pub fn build(self) -> Result, String> { + Ok(BlockService { + fork_service: self + .fork_service + .ok_or_else(|| "Cannot build BlockService without fork_service")?, + duties_service: self + .duties_service + .ok_or_else(|| "Cannot build BlockService without duties_service")?, + validator_store: self + .validator_store + .ok_or_else(|| "Cannot build BlockService without validator_store")?, + slot_clock: self + .slot_clock + .ok_or_else(|| "Cannot build BlockService without slot_clock")?, + beacon_node: self + .beacon_node + .ok_or_else(|| "Cannot build BlockService without beacon_node")?, + context: self + .context + .ok_or_else(|| "Cannot build BlockService without runtime_context")?, + }) + } +} + +#[derive(Clone)] +pub struct BlockService { + duties_service: DutiesService, + fork_service: ForkService, + validator_store: ValidatorStore, + slot_clock: Arc, + beacon_node: RemoteBeaconNode, + context: RuntimeContext, +} + +// TODO: clean trait bounds. +impl BlockService { + pub fn start_update_service(&self, spec: &ChainSpec) -> Result { + let log = self.context.log.clone(); + + let duration_to_next_slot = self + .slot_clock + .duration_to_next_slot() + .ok_or_else(|| "Unable to determine duration to next slot".to_string())?; + + let interval = { + let slot_duration = Duration::from_millis(spec.milliseconds_per_slot); + Interval::new( + Instant::now() + duration_to_next_slot + TIME_DELAY_FROM_SLOT, + slot_duration, + ) + }; + + info!( + log, + "Waiting for next slot"; + "seconds_to_wait" => duration_to_next_slot.as_secs() + ); + + let (exit_signal, exit_fut) = exit_future::signal(); + let service = self.clone(); + + self.context.executor.spawn( + interval + .map_err(move |e| { + error! { + log, + "Timer thread failed"; + "error" => format!("{}", e) + } + }) + .and_then(move |_| if exit_fut.is_live() { Ok(()) } else { Err(()) }) + .for_each(move |_| service.clone().do_update()), + ); + + Ok(exit_signal) + } + + fn do_update(self) -> impl Future { + let service = self.clone(); + let log = self.context.log.clone(); + + self.slot_clock + .now() + .ok_or_else(move || { + error!(log, "Duties manager failed to read slot clock"); + }) + .into_future() + .and_then(move |slot| { + let iter = service.duties_service.block_producers(slot).into_iter(); + + stream::unfold(iter, move |mut block_producers| { + let log_1 = service.context.log.clone(); + let log_2 = service.context.log.clone(); + let service_1 = service.clone(); + let service_2 = service.clone(); + let service_3 = service.clone(); + + block_producers.next().map(move |validator_pubkey| { + service_2 + .fork_service + .fork() + .ok_or_else(|| "Fork is unknown, unable to sign".to_string()) + .and_then(|fork| { + service_1 + .validator_store + .randao_reveal( + &validator_pubkey, + slot.epoch(E::slots_per_epoch()), + &fork, + ) + .map(|randao_reveal| (fork, randao_reveal)) + .ok_or_else(|| "Unable to produce randao reveal".to_string()) + }) + .into_future() + .and_then(move |(fork, randao_reveal)| { + service_1 + .beacon_node + .http + .validator() + .produce_block(slot, randao_reveal) + .map(|block| (fork, block)) + .map_err(|e| { + format!( + "Error from beacon node when producing block: {:?}", + e + ) + }) + }) + .and_then(move |(fork, block)| { + service_2 + .validator_store + .sign_block(&validator_pubkey, block, &fork) + .ok_or_else(|| "Unable to sign block".to_string()) + }) + .and_then(move |block| { + service_3 + .beacon_node + .http + .validator() + .publish_block(block) + .map_err(|e| { + format!( + "Error from beacon node when publishing block: {:?}", + e + ) + }) + }) + .map(move |publish_outcome| match publish_outcome { + PublishStatus::Valid => { + info!(log_1, "Successfully published block") + } + PublishStatus::Invalid(msg) => error!( + log_1, + "Published block was invalid"; + "message" => msg + ), + PublishStatus::Unknown => { + info!(log_1, "Unknown condition when publishing block") + } + }) + .map_err(move |e| { + error!( + log_2, + "Error whilst producing block"; + "message" => e + ) + }) + .then(|_| Ok(((), block_producers))) + }) + }) + .collect() + .map(|_| ()) + }) + } +} diff --git a/validator_client/src/duties_service.rs b/validator_client/src/duties_service.rs new file mode 100644 index 0000000000..8d3f24b783 --- /dev/null +++ b/validator_client/src/duties_service.rs @@ -0,0 +1,307 @@ +use crate::validator_store::ValidatorStore; +use environment::RuntimeContext; +use exit_future::Signal; +use futures::{Future, IntoFuture, Stream}; +use parking_lot::RwLock; +use remote_beacon_node::{RemoteBeaconNode, ValidatorDuty}; +use slog::{error, info, trace, warn}; +use slot_clock::SlotClock; +use std::collections::HashMap; +use std::sync::Arc; +use std::time::{Duration, Instant}; +use tokio::timer::Interval; +use types::{ChainSpec, Epoch, EthSpec, PublicKey, Slot}; + +/// Delay this period of time after the slot starts. This allows the node to process the new slot. +const TIME_DELAY_FROM_SLOT: Duration = Duration::from_millis(100); + +type BaseHashMap = HashMap>; + +enum InsertOutcome { + New, + Identical, + Replaced, +} + +#[derive(Default)] +pub struct DutiesStore { + store: RwLock, +} + +impl DutiesStore { + fn block_producers(&self, slot: Slot) -> Vec { + self.store + .read() + .iter() + // As long as a `HashMap` iterator does not return duplicate keys, neither will this + // function. + .filter(|(_validator_pubkey, validator_map)| { + validator_map.iter().any(|(_epoch, duties)| { + duties + .block_proposal_slot + .map(|proposal_slot| proposal_slot == slot) + .unwrap_or_else(|| false) + }) + }) + .map(|(validator_pubkey, _validator_map)| validator_pubkey) + .cloned() + .collect() + } + + fn insert(&self, epoch: Epoch, duties: ValidatorDuty) -> InsertOutcome { + let mut store = self.store.write(); + + if store.contains_key(&duties.validator_pubkey) { + let validator_map = store.get_mut(&duties.validator_pubkey).expect( + "Store is exclusively locked and this path is guarded to ensure the key exists.", + ); + + // TODO: validate that the slots in the duties are all in the given epoch. + + if validator_map.contains_key(&epoch) { + let known_duties = validator_map.get_mut(&epoch).expect( + "Validator map is exclusively mutable and this path is guarded to ensure the key exists.", + ); + + if *known_duties == duties { + InsertOutcome::Identical + } else { + *known_duties = duties; + InsertOutcome::Replaced + } + } else { + validator_map.insert(epoch, duties); + + InsertOutcome::New + } + } else { + let validator_pubkey = duties.validator_pubkey.clone(); + + let mut validator_map = HashMap::new(); + validator_map.insert(epoch, duties); + + store.insert(validator_pubkey, validator_map); + + InsertOutcome::New + } + } + + // TODO: call this. + fn prune(&self, prior_to: Epoch) { + self.store + .write() + .retain(|_validator_pubkey, validator_map| { + validator_map.retain(|epoch, _duties| *epoch >= prior_to); + !validator_map.is_empty() + }); + } +} + +#[derive(Clone)] +pub struct DutiesServiceBuilder { + store: Option>, + validator_store: Option>, + slot_clock: Option>, + beacon_node: Option>, + context: Option>, +} + +// TODO: clean trait bounds. +impl DutiesServiceBuilder { + pub fn new() -> Self { + Self { + store: None, + validator_store: None, + slot_clock: None, + beacon_node: None, + context: None, + } + } + + pub fn validator_store(mut self, store: ValidatorStore) -> Self { + self.validator_store = Some(store); + self + } + + pub fn slot_clock(mut self, slot_clock: T) -> Self { + self.slot_clock = Some(Arc::new(slot_clock)); + self + } + + pub fn beacon_node(mut self, beacon_node: RemoteBeaconNode) -> Self { + self.beacon_node = Some(beacon_node); + self + } + + pub fn runtime_context(mut self, context: RuntimeContext) -> Self { + self.context = Some(context); + self + } + + pub fn build(self) -> Result, String> { + Ok(DutiesService { + store: Arc::new(DutiesStore::default()), + validator_store: self + .validator_store + .ok_or_else(|| "Cannot build DutiesService without validator_store")?, + slot_clock: self + .slot_clock + .ok_or_else(|| "Cannot build DutiesService without slot_clock")?, + beacon_node: self + .beacon_node + .ok_or_else(|| "Cannot build DutiesService without beacon_node")?, + context: self + .context + .ok_or_else(|| "Cannot build DutiesService without runtime_context")?, + }) + } +} + +#[derive(Clone)] +pub struct DutiesService { + store: Arc, + validator_store: ValidatorStore, + slot_clock: Arc, + beacon_node: RemoteBeaconNode, + context: RuntimeContext, +} + +impl DutiesService { + /// Returns the pubkeys of the validators which are assigned to propose in the given slot. + /// + /// In normal cases, there should be 0 or 1 validators returned. In extreme cases (i.e., deep forking) + /// + /// It is possible that multiple validators have an identical proposal slot, however that is + /// likely the result of heavy forking (lol) or inconsistent beacon node connections. + pub fn block_producers(&self, slot: Slot) -> Vec { + self.store.block_producers(slot) + } + + pub fn start_update_service(&self, spec: &ChainSpec) -> Result { + let log = self.context.log.clone(); + + let duration_to_next_slot = self + .slot_clock + .duration_to_next_slot() + .ok_or_else(|| "Unable to determine duration to next slot".to_string())?; + + let interval = { + let slot_duration = Duration::from_millis(spec.milliseconds_per_slot); + Interval::new( + Instant::now() + duration_to_next_slot + TIME_DELAY_FROM_SLOT, + slot_duration, + ) + }; + + info!( + log, + "Waiting for next slot"; + "seconds_to_wait" => duration_to_next_slot.as_secs() + ); + + let (exit_signal, exit_fut) = exit_future::signal(); + let service = self.clone(); + + // Run an immediate update before starting the updater service. + self.context.executor.spawn(service.clone().do_update()); + + self.context.executor.spawn( + interval + .map_err(move |e| { + error! { + log, + "Timer thread failed"; + "error" => format!("{}", e) + } + }) + .and_then(move |_| if exit_fut.is_live() { Ok(()) } else { Err(()) }) + .for_each(move |_| service.clone().do_update()), + ); + + Ok(exit_signal) + } + + fn do_update(self) -> impl Future { + let slots_per_epoch = E::slots_per_epoch(); + let service_1 = self.clone(); + let service_2 = self.clone(); + let log = self.context.log.clone(); + + self.slot_clock + .now() + .ok_or_else(move || { + error!(log, "Duties manager failed to read slot clock"); + }) + .into_future() + .map(move |slot| slot.epoch(slots_per_epoch)) + .and_then(move |epoch| { + let log = service_1.context.log.clone(); + service_1.update_epoch(epoch).then(move |result| { + if let Err(e) = result { + error!( + log, + "Failed to get current epoch duties"; + "http_error" => format!("{:?}", e) + ); + } + + let log = service_2.context.log.clone(); + service_2.update_epoch(epoch + 1).map_err(move |e| { + error!( + log, + "Failed to get next epoch duties"; + "http_error" => format!("{:?}", e) + ); + }) + }) + }) + .map(|_| ()) + // Returning an error will stop the interval. This is not desired, a single failure + // should not stop all future attempts. + .then(|_| Ok(())) + } + + fn update_epoch(self, epoch: Epoch) -> impl Future { + let service_1 = self.clone(); + let service_2 = self.clone(); + + let pubkeys = service_1.validator_store.voting_pubkeys(); + service_1 + .beacon_node + .http + .validator() + .get_duties(epoch, pubkeys.as_slice()) + .map(move |all_duties| (epoch, all_duties)) + .map_err(move |e| format!("Failed to get duties for epoch {}: {:?}", epoch, e)) + .map(move |(epoch, all_duties)| { + let mut new = 0; + let mut identical = 0; + let mut replaced = 0; + + all_duties.into_iter().for_each(|duties| { + match service_2.store.insert(epoch, duties) { + InsertOutcome::New => new += 1, + InsertOutcome::Identical => identical += 1, + InsertOutcome::Replaced => replaced += 1, + }; + }); + + trace!( + service_2.context.log, + "Performed duties update"; + "replaced_duties" => replaced, + "identical_duties" => identical, + "new_duties" => new, + "epoch" => format!("{}", epoch) + ); + + if replaced > 0 { + warn!( + service_2.context.log, + "Duties changed during routine update"; + "info" => "Chain re-org likely occurred." + ) + } + }) + } +} diff --git a/validator_client/src/fork_service.rs b/validator_client/src/fork_service.rs new file mode 100644 index 0000000000..823b948396 --- /dev/null +++ b/validator_client/src/fork_service.rs @@ -0,0 +1,157 @@ +use environment::RuntimeContext; +use exit_future::Signal; +use futures::{Future, Stream}; +use parking_lot::RwLock; +use remote_beacon_node::RemoteBeaconNode; +use slog::{error, info, trace}; +use slot_clock::SlotClock; +use std::sync::Arc; +use std::time::{Duration, Instant}; +use tokio::timer::Interval; +use types::{ChainSpec, EthSpec, Fork}; + +/// Delay this period of time after the slot starts. This allows the node to process the new slot. +const TIME_DELAY_FROM_SLOT: Duration = Duration::from_millis(80); + +#[derive(Clone)] +pub struct ForkServiceBuilder { + fork: Option, + slot_clock: Option, + beacon_node: Option>, + context: Option>, +} + +// TODO: clean trait bounds. +impl ForkServiceBuilder { + pub fn new() -> Self { + Self { + fork: None, + slot_clock: None, + beacon_node: None, + context: None, + } + } + + pub fn slot_clock(mut self, slot_clock: T) -> Self { + self.slot_clock = Some(slot_clock); + self + } + + pub fn beacon_node(mut self, beacon_node: RemoteBeaconNode) -> Self { + self.beacon_node = Some(beacon_node); + self + } + + pub fn runtime_context(mut self, context: RuntimeContext) -> Self { + self.context = Some(context); + self + } + + pub fn build(self) -> Result, String> { + Ok(ForkService { + inner: Arc::new(Inner { + fork: RwLock::new(self.fork), + slot_clock: self + .slot_clock + .ok_or_else(|| "Cannot build ForkService without slot_clock")?, + beacon_node: self + .beacon_node + .ok_or_else(|| "Cannot build ForkService without beacon_node")?, + context: self + .context + .ok_or_else(|| "Cannot build ForkService without runtime_context")?, + }), + }) + } +} + +struct Inner { + fork: RwLock>, + beacon_node: RemoteBeaconNode, + context: RuntimeContext, + slot_clock: T, +} + +#[derive(Clone)] +pub struct ForkService { + inner: Arc>, +} + +// TODO: clean trait bounds. +impl ForkService { + pub fn fork(&self) -> Option { + self.inner.fork.read().clone() + } + + pub fn start_update_service(&self, spec: &ChainSpec) -> Result { + let log = self.inner.context.log.clone(); + + let duration_to_next_epoch = self + .inner + .slot_clock + .duration_to_next_epoch(E::slots_per_epoch()) + .ok_or_else(|| "Unable to determine duration to next epoch".to_string())?; + + let interval = { + let slot_duration = Duration::from_millis(spec.milliseconds_per_slot); + Interval::new( + Instant::now() + duration_to_next_epoch + TIME_DELAY_FROM_SLOT, + slot_duration * E::slots_per_epoch() as u32, + ) + }; + + info!( + log, + "Waiting for next slot"; + "seconds_to_wait" => duration_to_next_epoch.as_secs() + ); + + let (exit_signal, exit_fut) = exit_future::signal(); + let service = self.clone(); + + // Run an immediate update before starting the updater service. + self.inner + .context + .executor + .spawn(service.clone().do_update()); + + self.inner.context.executor.spawn( + interval + .map_err(move |e| { + error! { + log, + "Timer thread failed"; + "error" => format!("{}", e) + } + }) + .and_then(move |_| if exit_fut.is_live() { Ok(()) } else { Err(()) }) + .for_each(move |_| service.clone().do_update()), + ); + + Ok(exit_signal) + } + + fn do_update(self) -> impl Future { + let service_1 = self.inner.clone(); + let log_1 = service_1.context.log.clone(); + let log_2 = service_1.context.log.clone(); + + self.inner + .beacon_node + .http + .beacon() + .get_fork() + .map(move |fork| *(service_1.fork.write()) = Some(fork)) + .map(move |_| trace!(log_1, "Fork update success")) + .map_err(move |e| { + trace!( + log_2, + "Fork update failed"; + "error" => format!("Error retrieving fork: {:?}", e) + ) + }) + // Returning an error will stop the interval. This is not desired, a single failure + // should not stop all future attempts. + .then(|_| Ok(())) + } +} diff --git a/validator_client/src/lib.rs b/validator_client/src/lib.rs index d9b6f40443..863436ba3f 100644 --- a/validator_client/src/lib.rs +++ b/validator_client/src/lib.rs @@ -1,42 +1,44 @@ -mod attestation_producer; -mod block_producer; +mod block_service; mod cli; mod config; -mod duties; +mod duties_service; mod error; -mod service; +mod fork_service; mod signer; +mod validator_store; + pub mod validator_directory; pub use cli::cli_app; pub use config::Config; +use block_service::{BlockService, BlockServiceBuilder}; use clap::ArgMatches; use config::{Config as ClientConfig, KeySource}; +use duties_service::{DutiesService, DutiesServiceBuilder}; use environment::RuntimeContext; use eth2_config::Eth2Config; use exit_future::Signal; -use futures::Stream; +use fork_service::{ForkService, ForkServiceBuilder}; +use futures::{Future, IntoFuture}; use lighthouse_bootstrap::Bootstrapper; use parking_lot::RwLock; -use protos::services_grpc::ValidatorServiceClient; -use service::Service; -use slog::{error, info, warn, Logger}; +use remote_beacon_node::RemoteBeaconNode; +use slog::{info, Logger}; use slot_clock::SlotClock; +use slot_clock::SystemTimeSlotClock; use std::path::PathBuf; use std::sync::Arc; -use std::time::{Duration, Instant}; -use tokio::timer::Interval; -use types::{EthSpec, Keypair}; - -/// A fixed amount of time after a slot to perform operations. This gives the node time to complete -/// per-slot processes. -const TIME_DELAY_FROM_SLOT: Duration = Duration::from_millis(100); +use std::time::Duration; +use types::EthSpec; +use validator_store::ValidatorStore; #[derive(Clone)] pub struct ProductionValidatorClient { context: RuntimeContext, - service: Arc>, + duties_service: DutiesService, + fork_service: ForkService, + block_service: BlockService, exit_signals: Arc>>, } @@ -46,97 +48,156 @@ impl ProductionValidatorClient { pub fn new_from_cli( mut context: RuntimeContext, matches: &ArgMatches, - ) -> Result { + ) -> impl Future { let mut log = context.log.clone(); - let (config, eth2_config) = get_configs(&matches, &mut log) - .map_err(|e| format!("Unable to initialize config: {}", e))?; + get_configs(&matches, &mut log) + .into_future() + .map_err(|e| format!("Unable to initialize config: {}", e)) + .and_then(|(client_config, eth2_config)| { + // TODO: the eth2 config in the env is being completely ignored. + // + // See https://github.com/sigp/lighthouse/issues/602 + context.eth2_config = eth2_config; - // TODO: the eth2 config in the env is being completely ignored. - // - // See https://github.com/sigp/lighthouse/issues/602 - context.eth2_config = eth2_config; - - Self::new(context, config) + Self::new(context, client_config) + }) } /// Instantiates the validator client, _without_ starting the timers to trigger block /// and attestation production. - pub fn new(context: RuntimeContext, config: Config) -> Result { + pub fn new( + mut context: RuntimeContext, + client_config: ClientConfig, + ) -> impl Future { let log = context.log.clone(); info!( log, "Starting validator client"; - "datadir" => config.full_data_dir().expect("Unable to find datadir").to_str(), + "datadir" => client_config.full_data_dir().expect("Unable to find datadir").to_str(), ); - let service: Service = - Service::initialize_service(config, context.eth2_config.clone(), log.clone()) - .map_err(|e| e.to_string())?; + format!( + "{}:{}", + client_config.server, client_config.server_http_port + ) + .parse() + .map_err(|e| format!("Unable to parse server address: {:?}", e)) + .into_future() + .and_then(|http_server_addr| { + RemoteBeaconNode::new(http_server_addr) + .map_err(|e| format!("Unable to init beacon node http client: {}", e)) + }) + .and_then(|beacon_node| { + // TODO: add loop function to retry if node not online. + beacon_node + .http + .spec() + .get_eth2_config() + .map(|eth2_config| (beacon_node, eth2_config)) + .map_err(|e| format!("Unable to read eth2 config from beacon node: {:?}", e)) + }) + .and_then(|(beacon_node, eth2_config)| { + beacon_node + .http + .beacon() + .get_genesis_time() + .map(|genesis_time| (beacon_node, eth2_config, genesis_time)) + .map_err(|e| format!("Unable to read genesis time from beacon node: {:?}", e)) + }) + .and_then(move |(beacon_node, remote_eth2_config, genesis_time)| { + // Do not permit a connection to a beacon node using different spec constants. + if context.eth2_config.spec_constants != remote_eth2_config.spec_constants { + return Err(format!( + "Beacon node is using an incompatible spec. Got {}, expected {}", + remote_eth2_config.spec_constants, context.eth2_config.spec_constants + )); + } - Ok(Self { - context, - service: Arc::new(service), - exit_signals: Arc::new(RwLock::new(vec![])), + // Note: here we just assume the spec variables of the remote node. This is very useful + // for testnets, but perhaps a security issue when it comes to mainnet. + // + // A damaging attack would be for a beacon node to convince the validator client of a + // different `SLOTS_PER_EPOCH` variable. This could result in slashable messages being + // produced. We are safe from this because `SLOTS_PER_EPOCH` is a type-level constant + // for Lighthouse. + context.eth2_config = remote_eth2_config; + + let slot_clock = SystemTimeSlotClock::new( + context.eth2_config.spec.genesis_slot, + Duration::from_secs(genesis_time), + Duration::from_millis(context.eth2_config.spec.milliseconds_per_slot), + ); + + dbg!(context.eth2_config.spec.milliseconds_per_slot); + + // TODO: fix expect. + let validator_store = ValidatorStore::load_from_disk( + client_config.full_data_dir().expect("Get rid of this."), + context.eth2_config.spec.clone(), + log.clone(), + )?; + + info!( + log, + "Loaded validator keypair store"; + "voting_validators" => validator_store.num_voting_validators() + ); + + let duties_service = DutiesServiceBuilder::new() + .slot_clock(slot_clock.clone()) + .validator_store(validator_store.clone()) + .beacon_node(beacon_node.clone()) + .runtime_context(context.service_context("duties")) + .build()?; + + let fork_service = ForkServiceBuilder::new() + .slot_clock(slot_clock.clone()) + .beacon_node(beacon_node.clone()) + .runtime_context(context.service_context("fork")) + .build()?; + + let block_service = BlockServiceBuilder::new() + .duties_service(duties_service.clone()) + .fork_service(fork_service.clone()) + .slot_clock(slot_clock) + .validator_store(validator_store) + .beacon_node(beacon_node) + .runtime_context(context.service_context("block")) + .build()?; + + Ok(Self { + context, + duties_service, + fork_service, + block_service, + exit_signals: Arc::new(RwLock::new(vec![])), + }) }) } - /// Starts the timers to trigger block and attestation production. pub fn start_service(&self) -> Result<(), String> { - let service = self.clone().service; - let log = self.context.log.clone(); + let duties_exit = self + .duties_service + .start_update_service(&self.context.eth2_config.spec) + .map_err(|e| format!("Unable to start duties service: {}", e))?; - let duration_to_next_slot = service - .slot_clock - .duration_to_next_slot() - .ok_or_else(|| "Unable to determine duration to next slot. Exiting.".to_string())?; + self.exit_signals.write().push(duties_exit); - // set up the validator work interval - start at next slot and proceed every slot - let interval = { - // Set the interval to start at the next slot, and every slot after - let slot_duration = Duration::from_millis(service.spec.milliseconds_per_slot); - //TODO: Handle checked add correctly - Interval::new(Instant::now() + duration_to_next_slot, slot_duration) - }; + let fork_exit = self + .fork_service + .start_update_service(&self.context.eth2_config.spec) + .map_err(|e| format!("Unable to start fork service: {}", e))?; - if service.slot_clock.now().is_none() { - warn!( - log, - "Starting node prior to genesis"; - ); - } + self.exit_signals.write().push(fork_exit); - info!( - log, - "Waiting for next slot"; - "seconds_to_wait" => duration_to_next_slot.as_secs() - ); + let block_exit = self + .block_service + .start_update_service(&self.context.eth2_config.spec) + .map_err(|e| format!("Unable to start block service: {}", e))?; - let (exit_signal, exit_fut) = exit_future::signal(); - - self.exit_signals.write().push(exit_signal); - - /* kick off the core service */ - self.context.executor.spawn( - interval - .map_err(move |e| { - error! { - log, - "Timer thread failed"; - "error" => format!("{}", e) - } - }) - .and_then(move |_| if exit_fut.is_live() { Ok(()) } else { Err(()) }) - .for_each(move |_| { - // wait for node to process - std::thread::sleep(TIME_DELAY_FROM_SLOT); - // if a non-fatal error occurs, proceed to the next slot. - let _ignore_error = service.per_slot_execution(); - // completed a slot process - Ok(()) - }), - ); + self.exit_signals.write().push(block_exit); Ok(()) } diff --git a/validator_client/src/validator_store.rs b/validator_client/src/validator_store.rs new file mode 100644 index 0000000000..81feb4af73 --- /dev/null +++ b/validator_client/src/validator_store.rs @@ -0,0 +1,109 @@ +use crate::validator_directory::ValidatorDirectory; +use parking_lot::RwLock; +use slog::{error, Logger}; +use std::collections::HashMap; +use std::fs::read_dir; +use std::iter::FromIterator; +use std::marker::PhantomData; +use std::path::PathBuf; +use std::sync::Arc; +use tree_hash::{SignedRoot, TreeHash}; +use types::{BeaconBlock, ChainSpec, Domain, Epoch, EthSpec, Fork, PublicKey, Signature}; + +#[derive(Clone)] +pub struct ValidatorStore { + validators: Arc>>, + spec: Arc, + _phantom: PhantomData, +} + +impl ValidatorStore { + pub fn load_from_disk(base_dir: PathBuf, spec: ChainSpec, log: Logger) -> Result { + let validator_iter = read_dir(&base_dir) + .map_err(|e| format!("Failed to read base directory: {:?}", e))? + .filter_map(|validator_dir| { + let path = validator_dir.ok()?.path(); + + if path.is_dir() { + match ValidatorDirectory::load_for_signing(path.clone()) { + Ok(validator_directory) => Some(validator_directory), + Err(e) => { + error!( + log, + "Failed to load a validator directory"; + "error" => e, + "path" => path.to_str(), + ); + None + } + } + } else { + None + } + }) + .filter_map(|validator_directory| { + validator_directory + .voting_keypair + .clone() + .map(|voting_keypair| (voting_keypair.pk, validator_directory)) + }); + + Ok(Self { + validators: Arc::new(RwLock::new(HashMap::from_iter(validator_iter))), + spec: Arc::new(spec), + _phantom: PhantomData, + }) + } + + pub fn voting_pubkeys(&self) -> Vec { + self.validators + .read() + .iter() + .map(|(pubkey, _dir)| pubkey.clone()) + .collect() + } + + pub fn num_voting_validators(&self) -> usize { + self.validators.read().len() + } + + pub fn randao_reveal( + &self, + validator_pubkey: &PublicKey, + epoch: Epoch, + fork: &Fork, + ) -> Option { + // TODO: check this against the slot clock to make sure it's not an early reveal? + self.validators + .read() + .get(validator_pubkey) + .and_then(|validator_dir| { + validator_dir.voting_keypair.as_ref().map(|voting_keypair| { + let message = epoch.tree_hash_root(); + let domain = self.spec.get_domain(epoch, Domain::Randao, &fork); + Signature::new(&message, domain, &voting_keypair.sk) + }) + }) + } + + pub fn sign_block( + &self, + validator_pubkey: &PublicKey, + mut block: BeaconBlock, + fork: &Fork, + ) -> Option> { + // TODO: check for slashing. + self.validators + .read() + .get(validator_pubkey) + .and_then(|validator_dir| { + validator_dir.voting_keypair.as_ref().map(|voting_keypair| { + let epoch = block.slot.epoch(E::slots_per_epoch()); + let message = block.signed_root(); + let domain = self.spec.get_domain(epoch, Domain::BeaconProposer, &fork); + block.signature = Signature::new(&message, domain, &voting_keypair.sk); + block + }) + }) + } +}