From 7991bbcc22cb5f754e8d3390a0ff39e4e0e0f059 Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Sat, 23 Nov 2019 20:42:07 +1100 Subject: [PATCH] Tidy val client --- validator_client/src/attestation_service.rs | 115 ++++++++++++-------- validator_client/src/block_service.rs | 66 +++++++---- validator_client/src/duties_service.rs | 70 +++++++----- validator_client/src/fork_service.rs | 47 ++++---- validator_client/src/validator_store.rs | 8 +- 5 files changed, 187 insertions(+), 119 deletions(-) diff --git a/validator_client/src/attestation_service.rs b/validator_client/src/attestation_service.rs index 1fe5a488c4..67920f9cf5 100644 --- a/validator_client/src/attestation_service.rs +++ b/validator_client/src/attestation_service.rs @@ -3,19 +3,17 @@ use environment::RuntimeContext; use exit_future::Signal; use futures::{Future, Stream}; use remote_beacon_node::{PublishStatus, RemoteBeaconNode, ValidatorDuty}; -use slog::{error, info, trace}; +use slog::{crit, info, trace}; use slot_clock::SlotClock; use std::collections::HashMap; +use std::ops::Deref; use std::sync::Arc; use std::time::{Duration, Instant}; use tokio::timer::Interval; use types::{ChainSpec, CommitteeIndex, EthSpec, Slot}; -/// Delay this period of time after the slot starts. This allows the node to process the new slot. -const TIME_DELAY_FROM_SLOT: Duration = Duration::from_millis(100); - -#[derive(Clone)] -pub struct AttestationServiceBuilder { +/// Builds an `AttestationService`. +pub struct AttestationServiceBuilder { duties_service: Option>, validator_store: Option>, slot_clock: Option, @@ -23,8 +21,7 @@ pub struct AttestationServiceBuilder { context: Option>, } -// TODO: clean trait bounds. -impl AttestationServiceBuilder { +impl AttestationServiceBuilder { pub fn new() -> Self { Self { duties_service: None, @@ -83,7 +80,8 @@ impl AttestationServiceBuilder } } -pub struct Inner { +/// Helper to minimise `Arc` usage. +pub struct Inner { duties_service: DutiesService, validator_store: ValidatorStore, slot_clock: T, @@ -91,19 +89,38 @@ pub struct Inner { context: RuntimeContext, } -#[derive(Clone)] -pub struct AttestationService { +/// Attempts to produce attestations for all known validators 1/3rd of the way through each slot. +/// +/// If any validators are on the same committee, a single attestation will be downloaded and +/// returned to the beacon node. This attestation will have a signature from each of the +/// validators. +pub struct AttestationService { inner: Arc>, } -// TODO: clean trait bounds. -impl AttestationService { +impl Clone for AttestationService { + fn clone(&self) -> Self { + Self { + inner: self.inner.clone(), + } + } +} + +impl Deref for AttestationService { + type Target = Inner; + + fn deref(&self) -> &Self::Target { + self.inner.deref() + } +} + +impl AttestationService { + /// Starts the service which periodically produces attestations. pub fn start_update_service(&self, spec: &ChainSpec) -> Result { - let context = &self.inner.context; + let context = &self.context; let log = context.log.clone(); let duration_to_next_slot = self - .inner .slot_clock .duration_to_next_slot() .ok_or_else(|| "Unable to determine duration to next slot".to_string())?; @@ -111,7 +128,7 @@ impl AttestationService { let interval = { let slot_duration = Duration::from_millis(spec.milliseconds_per_slot); Interval::new( - Instant::now() + duration_to_next_slot * 3 / 2 + TIME_DELAY_FROM_SLOT, + Instant::now() + duration_to_next_slot + slot_duration / 3, slot_duration, ) }; @@ -127,15 +144,15 @@ impl AttestationService { .until( interval .map_err(move |e| { - error! { + crit! { log_1, "Timer thread failed"; "error" => format!("{}", e) } }) .for_each(move |_| { - if let Err(e) = service.clone().spawn_attestation_tasks() { - error!( + if let Err(e) = service.spawn_attestation_tasks() { + crit!( log_2, "Failed to spawn attestation tasks"; "error" => e @@ -158,17 +175,19 @@ impl AttestationService { Ok(exit_signal) } + /// For each each required attestation, spawn a new task that downloads, signs and uploads the + /// attestation to the beacon node. fn spawn_attestation_tasks(&self) -> Result<(), String> { - let inner = self.inner.clone(); + let service = self.clone(); - let slot = inner + let slot = service .slot_clock .now() .ok_or_else(|| "Failed to read slot clock".to_string())?; let mut committee_indices: HashMap> = HashMap::new(); - inner + service .duties_service .attesters(slot) .into_iter() @@ -185,7 +204,7 @@ impl AttestationService { .into_iter() .for_each(|(committee_index, validator_duties)| { // Spawn a separate task for each attestation. - inner.context.executor.spawn(self.clone().do_attestation( + service.context.executor.spawn(self.clone().do_attestation( slot, committee_index, validator_duties, @@ -195,28 +214,29 @@ impl AttestationService { Ok(()) } + /// For a given `committee_index`, download the attestation, have it signed by all validators + /// in `validator_duties` then upload it. fn do_attestation( - self, + &self, slot: Slot, committee_index: CommitteeIndex, validator_duties: Vec, ) -> impl Future { - let inner_1 = self.inner.clone(); - let inner_2 = self.inner.clone(); - let log_1 = self.inner.context.log.clone(); - let log_2 = self.inner.context.log.clone(); + let service_1 = self.clone(); + let service_2 = self.clone(); + let log_1 = self.context.log.clone(); + let log_2 = self.context.log.clone(); - self.inner - .beacon_node + self.beacon_node .http .validator() .produce_attestation(slot, committee_index) .map_err(|e| format!("Failed to produce attestation: {:?}", e)) - .and_then(move |attestation| { + .map(move |attestation| { validator_duties .iter() - .try_fold(attestation, |attestation, duty| { - let log = inner_1.context.log.clone(); + .fold(attestation, |mut attestation, duty| { + let log = service_1.context.log.clone(); if let Some(( duty_slot, @@ -225,28 +245,29 @@ impl AttestationService { )) = attestation_duties(duty) { if duty_slot == slot && duty_committee_index == committee_index { - inner_1 + if service_1 .validator_store .sign_attestation( &duty.validator_pubkey, validator_committee_position, - attestation, + &mut attestation, ) - .ok_or_else(|| "Unable to sign attestation".to_string()) + .is_none() + { + crit!(log, "Failed to sign attestation"); + } } else { - error!(log, "Inconsistent validator duties during signing"); - - Ok(attestation) + crit!(log, "Inconsistent validator duties during signing"); } } else { - error!(log, "Missing validator duties when signing"); - - Ok(attestation) + crit!(log, "Missing validator duties when signing"); } + + attestation }) }) .and_then(move |attestation| { - inner_2 + service_2 .beacon_node .http .validator() @@ -263,7 +284,7 @@ impl AttestationService { "committee_index" => attestation.data.index, "slot" => attestation.data.slot.as_u64(), ), - PublishStatus::Invalid(msg) => error!( + PublishStatus::Invalid(msg) => crit!( log_1, "Published attestation was invalid"; "message" => msg, @@ -271,11 +292,11 @@ impl AttestationService { "slot" => attestation.data.slot.as_u64(), ), PublishStatus::Unknown => { - info!(log_1, "Unknown condition when publishing attestation") + crit!(log_1, "Unknown condition when publishing attestation") } }) .map_err(move |e| { - error!( + crit!( log_2, "Error during attestation production"; "error" => e @@ -284,7 +305,7 @@ impl AttestationService { } } -pub fn attestation_duties(duty: &ValidatorDuty) -> Option<(Slot, CommitteeIndex, usize)> { +fn attestation_duties(duty: &ValidatorDuty) -> Option<(Slot, CommitteeIndex, usize)> { Some(( duty.attestation_slot?, duty.attestation_committee_index?, diff --git a/validator_client/src/block_service.rs b/validator_client/src/block_service.rs index 5f283fd6d8..068a94e1c8 100644 --- a/validator_client/src/block_service.rs +++ b/validator_client/src/block_service.rs @@ -5,6 +5,7 @@ use futures::{stream, Future, IntoFuture, Stream}; use remote_beacon_node::{PublishStatus, RemoteBeaconNode}; use slog::{error, info}; use slot_clock::SlotClock; +use std::ops::Deref; use std::sync::Arc; use std::time::{Duration, Instant}; use tokio::timer::Interval; @@ -13,8 +14,8 @@ use types::{ChainSpec, EthSpec}; /// Delay this period of time after the slot starts. This allows the node to process the new slot. const TIME_DELAY_FROM_SLOT: Duration = Duration::from_millis(100); -#[derive(Clone)] -pub struct BlockServiceBuilder { +/// Builds a `BlockService`. +pub struct BlockServiceBuilder { duties_service: Option>, validator_store: Option>, slot_clock: Option>, @@ -22,8 +23,7 @@ pub struct BlockServiceBuilder { context: Option>, } -// TODO: clean trait bounds. -impl BlockServiceBuilder { +impl BlockServiceBuilder { pub fn new() -> Self { Self { duties_service: None, @@ -61,27 +61,28 @@ impl BlockServiceBuilder { pub fn build(self) -> Result, String> { Ok(BlockService { - duties_service: self - .duties_service - .ok_or_else(|| "Cannot build BlockService without duties_service")?, - validator_store: self - .validator_store - .ok_or_else(|| "Cannot build BlockService without validator_store")?, - slot_clock: self - .slot_clock - .ok_or_else(|| "Cannot build BlockService without slot_clock")?, - beacon_node: self - .beacon_node - .ok_or_else(|| "Cannot build BlockService without beacon_node")?, - context: self - .context - .ok_or_else(|| "Cannot build BlockService without runtime_context")?, + inner: Arc::new(Inner { + duties_service: self + .duties_service + .ok_or_else(|| "Cannot build BlockService without duties_service")?, + validator_store: self + .validator_store + .ok_or_else(|| "Cannot build BlockService without validator_store")?, + slot_clock: self + .slot_clock + .ok_or_else(|| "Cannot build BlockService without slot_clock")?, + beacon_node: self + .beacon_node + .ok_or_else(|| "Cannot build BlockService without beacon_node")?, + context: self + .context + .ok_or_else(|| "Cannot build BlockService without runtime_context")?, + }), }) } } -#[derive(Clone)] -pub struct BlockService { +pub struct Inner { duties_service: DutiesService, validator_store: ValidatorStore, slot_clock: Arc, @@ -89,8 +90,27 @@ pub struct BlockService { context: RuntimeContext, } -// TODO: clean trait bounds. -impl BlockService { +pub struct BlockService { + inner: Arc>, +} + +impl Clone for BlockService { + fn clone(&self) -> Self { + Self { + inner: self.inner.clone(), + } + } +} + +impl Deref for BlockService { + type Target = Inner; + + fn deref(&self) -> &Self::Target { + self.inner.deref() + } +} + +impl BlockService { pub fn start_update_service(&self, spec: &ChainSpec) -> Result { let log = self.context.log.clone(); diff --git a/validator_client/src/duties_service.rs b/validator_client/src/duties_service.rs index 12b0d1b17d..6f6b0eb5d9 100644 --- a/validator_client/src/duties_service.rs +++ b/validator_client/src/duties_service.rs @@ -7,6 +7,7 @@ use remote_beacon_node::{RemoteBeaconNode, ValidatorDuty}; use slog::{error, info, trace, warn}; use slot_clock::SlotClock; use std::collections::HashMap; +use std::ops::Deref; use std::sync::Arc; use std::time::{Duration, Instant}; use tokio::timer::Interval; @@ -121,20 +122,16 @@ impl DutiesStore { } } -#[derive(Clone)] -pub struct DutiesServiceBuilder { - store: Option>, +pub struct DutiesServiceBuilder { validator_store: Option>, - slot_clock: Option>, + slot_clock: Option, beacon_node: Option>, context: Option>, } -// TODO: clean trait bounds. -impl DutiesServiceBuilder { +impl DutiesServiceBuilder { pub fn new() -> Self { Self { - store: None, validator_store: None, slot_clock: None, beacon_node: None, @@ -148,7 +145,7 @@ impl DutiesServiceBuilder { } pub fn slot_clock(mut self, slot_clock: T) -> Self { - self.slot_clock = Some(Arc::new(slot_clock)); + self.slot_clock = Some(slot_clock); self } @@ -164,33 +161,54 @@ impl DutiesServiceBuilder { pub fn build(self) -> Result, String> { Ok(DutiesService { - store: Arc::new(DutiesStore::default()), - validator_store: self - .validator_store - .ok_or_else(|| "Cannot build DutiesService without validator_store")?, - slot_clock: self - .slot_clock - .ok_or_else(|| "Cannot build DutiesService without slot_clock")?, - beacon_node: self - .beacon_node - .ok_or_else(|| "Cannot build DutiesService without beacon_node")?, - context: self - .context - .ok_or_else(|| "Cannot build DutiesService without runtime_context")?, + inner: Arc::new(Inner { + store: Arc::new(DutiesStore::default()), + validator_store: self + .validator_store + .ok_or_else(|| "Cannot build DutiesService without validator_store")?, + slot_clock: self + .slot_clock + .ok_or_else(|| "Cannot build DutiesService without slot_clock")?, + beacon_node: self + .beacon_node + .ok_or_else(|| "Cannot build DutiesService without beacon_node")?, + context: self + .context + .ok_or_else(|| "Cannot build DutiesService without runtime_context")?, + }), }) } } -#[derive(Clone)] -pub struct DutiesService { +pub struct Inner { store: Arc, validator_store: ValidatorStore, - slot_clock: Arc, + slot_clock: T, beacon_node: RemoteBeaconNode, context: RuntimeContext, } -impl DutiesService { +pub struct DutiesService { + inner: Arc>, +} + +impl Clone for DutiesService { + fn clone(&self) -> Self { + Self { + inner: self.inner.clone(), + } + } +} + +impl Deref for DutiesService { + type Target = Inner; + + fn deref(&self) -> &Self::Target { + self.inner.deref() + } +} + +impl DutiesService { /// Returns the pubkeys of the validators which are assigned to propose in the given slot. /// /// In normal cases, there should be 0 or 1 validators returned. In extreme cases (i.e., deep forking) @@ -251,7 +269,7 @@ impl DutiesService { Ok(exit_signal) } - fn do_update(self) -> impl Future { + fn do_update(&self) -> impl Future { let service_1 = self.clone(); let service_2 = self.clone(); let service_3 = self.clone(); diff --git a/validator_client/src/fork_service.rs b/validator_client/src/fork_service.rs index 04b922b687..8c0cbd041a 100644 --- a/validator_client/src/fork_service.rs +++ b/validator_client/src/fork_service.rs @@ -5,6 +5,7 @@ use parking_lot::RwLock; use remote_beacon_node::RemoteBeaconNode; use slog::{error, info, trace}; use slot_clock::SlotClock; +use std::ops::Deref; use std::sync::Arc; use std::time::{Duration, Instant}; use tokio::timer::Interval; @@ -13,16 +14,14 @@ use types::{ChainSpec, EthSpec, Fork}; /// Delay this period of time after the slot starts. This allows the node to process the new slot. const TIME_DELAY_FROM_SLOT: Duration = Duration::from_millis(80); -#[derive(Clone)] -pub struct ForkServiceBuilder { +pub struct ForkServiceBuilder { fork: Option, slot_clock: Option, beacon_node: Option>, context: Option>, } -// TODO: clean trait bounds. -impl ForkServiceBuilder { +impl ForkServiceBuilder { pub fn new() -> Self { Self { fork: None, @@ -65,29 +64,42 @@ impl ForkServiceBuilder { } } -struct Inner { +pub struct Inner { fork: RwLock>, beacon_node: RemoteBeaconNode, context: RuntimeContext, slot_clock: T, } -#[derive(Clone)] pub struct ForkService { inner: Arc>, } -// TODO: clean trait bounds. -impl ForkService { +impl Clone for ForkService { + fn clone(&self) -> Self { + Self { + inner: self.inner.clone(), + } + } +} + +impl Deref for ForkService { + type Target = Inner; + + fn deref(&self) -> &Self::Target { + self.inner.deref() + } +} + +impl ForkService { pub fn fork(&self) -> Option { - self.inner.fork.read().clone() + self.fork.read().clone() } pub fn start_update_service(&self, spec: &ChainSpec) -> Result { - let log = self.inner.context.log.clone(); + let log = self.context.log.clone(); let duration_to_next_epoch = self - .inner .slot_clock .duration_to_next_epoch(E::slots_per_epoch()) .ok_or_else(|| "Unable to determine duration to next epoch".to_string())?; @@ -106,12 +118,9 @@ impl ForkService { let log_2 = log.clone(); // Run an immediate update before starting the updater service. - self.inner - .context - .executor - .spawn(service.clone().do_update()); + self.context.executor.spawn(service.clone().do_update()); - self.inner.context.executor.spawn( + self.context.executor.spawn( exit_fut .until( interval @@ -122,7 +131,7 @@ impl ForkService { "error" => format!("{}", e) } }) - .for_each(move |_| service.clone().do_update()) + .for_each(move |_| service.do_update()) // Prevent any errors from escaping and stopping the interval. .then(|_| Ok(())), ) @@ -132,8 +141,8 @@ impl ForkService { Ok(exit_signal) } - fn do_update(self) -> impl Future { - let service_1 = self.inner.clone(); + fn do_update(&self) -> impl Future { + let service_1 = self.clone(); let log_1 = service_1.context.log.clone(); let log_2 = service_1.context.log.clone(); diff --git a/validator_client/src/validator_store.rs b/validator_client/src/validator_store.rs index a730e79081..892b015ee7 100644 --- a/validator_client/src/validator_store.rs +++ b/validator_client/src/validator_store.rs @@ -27,7 +27,7 @@ pub struct ValidatorStore { _phantom: PhantomData, } -impl ValidatorStore { +impl ValidatorStore { pub fn load_from_disk( base_dir: PathBuf, spec: ChainSpec, @@ -171,8 +171,8 @@ impl ValidatorStore { &self, validator_pubkey: &PublicKey, validator_committee_position: usize, - mut attestation: Attestation, - ) -> Option> { + attestation: &mut Attestation, + ) -> Option<()> { // TODO: check for slashing. self.validators .read() @@ -196,7 +196,7 @@ impl ValidatorStore { }) .ok()?; - Some(attestation) + Some(()) }) } }