diff --git a/validator_client/src/attestation_service.rs b/validator_client/src/attestation_service.rs new file mode 100644 index 0000000000..a7b881b53d --- /dev/null +++ b/validator_client/src/attestation_service.rs @@ -0,0 +1,245 @@ +use crate::{ + duties_service::DutiesService, fork_service::ForkService, validator_store::ValidatorStore, +}; +use environment::RuntimeContext; +use exit_future::Signal; +use futures::{stream, Future, IntoFuture, Stream}; +use remote_beacon_node::{PublishStatus, RemoteBeaconNode}; +use slog::{error, info, trace, warn}; +use slot_clock::SlotClock; +use std::sync::Arc; +use std::time::{Duration, Instant}; +use tokio::timer::Interval; +use types::{ChainSpec, EthSpec}; + +/// Delay this period of time after the slot starts. This allows the node to process the new slot. +const TIME_DELAY_FROM_SLOT: Duration = Duration::from_millis(100); + +#[derive(Clone)] +pub struct AttestationServiceBuilder { + fork_service: Option>, + duties_service: Option>, + validator_store: Option>, + slot_clock: Option>, + beacon_node: Option>, + context: Option>, +} + +// TODO: clean trait bounds. +impl AttestationServiceBuilder { + pub fn new() -> Self { + Self { + fork_service: None, + duties_service: None, + validator_store: None, + slot_clock: None, + beacon_node: None, + context: None, + } + } + + pub fn fork_service(mut self, service: ForkService) -> Self { + self.fork_service = Some(service); + self + } + + pub fn duties_service(mut self, service: DutiesService) -> Self { + self.duties_service = Some(service); + self + } + + pub fn validator_store(mut self, store: ValidatorStore) -> Self { + self.validator_store = Some(store); + self + } + + pub fn slot_clock(mut self, slot_clock: T) -> Self { + self.slot_clock = Some(Arc::new(slot_clock)); + self + } + + pub fn beacon_node(mut self, beacon_node: RemoteBeaconNode) -> Self { + self.beacon_node = Some(beacon_node); + self + } + + pub fn runtime_context(mut self, context: RuntimeContext) -> Self { + self.context = Some(context); + self + } + + pub fn build(self) -> Result, String> { + Ok(AttestationService { + fork_service: self + .fork_service + .ok_or_else(|| "Cannot build AttestationService without fork_service")?, + duties_service: self + .duties_service + .ok_or_else(|| "Cannot build AttestationService without duties_service")?, + validator_store: self + .validator_store + .ok_or_else(|| "Cannot build AttestationService without validator_store")?, + slot_clock: self + .slot_clock + .ok_or_else(|| "Cannot build AttestationService without slot_clock")?, + beacon_node: self + .beacon_node + .ok_or_else(|| "Cannot build AttestationService without beacon_node")?, + context: self + .context + .ok_or_else(|| "Cannot build AttestationService without runtime_context")?, + }) + } +} + +#[derive(Clone)] +pub struct AttestationService { + duties_service: DutiesService, + fork_service: ForkService, + validator_store: ValidatorStore, + slot_clock: Arc, + beacon_node: RemoteBeaconNode, + context: RuntimeContext, +} + +// TODO: clean trait bounds. +impl AttestationService { + pub fn start_update_service(&self, spec: &ChainSpec) -> Result { + let log = self.context.log.clone(); + + let duration_to_next_slot = self + .slot_clock + .duration_to_next_slot() + .ok_or_else(|| "Unable to determine duration to next slot".to_string())?; + + let interval = { + let slot_duration = Duration::from_millis(spec.milliseconds_per_slot); + Interval::new( + Instant::now() + duration_to_next_slot + TIME_DELAY_FROM_SLOT, + slot_duration, + ) + }; + + info!( + log, + "Waiting for next slot"; + "seconds_to_wait" => duration_to_next_slot.as_secs() + ); + + let (exit_signal, exit_fut) = exit_future::signal(); + let service = self.clone(); + + self.context.executor.spawn( + interval + .map_err(move |e| { + error! { + log, + "Timer thread failed"; + "error" => format!("{}", e) + } + }) + .and_then(move |_| if exit_fut.is_live() { Ok(()) } else { Err(()) }) + .for_each(move |_| service.clone().do_update()), + ); + + Ok(exit_signal) + } + + fn do_update(self) -> impl Future { + let service = self.clone(); + let log = self.context.log.clone(); + + self.slot_clock + .now() + .ok_or_else(move || { + error!(log, "Duties manager failed to read slot clock"); + }) + .into_future() + .and_then(move |slot| { + let iter = service.duties_service.block_producers(slot).into_iter(); + + stream::unfold(iter, move |mut block_producers| { + let log_1 = service.context.log.clone(); + let log_2 = service.context.log.clone(); + let service_1 = service.clone(); + let service_2 = service.clone(); + let service_3 = service.clone(); + + block_producers.next().map(move |validator_pubkey| { + service_2 + .fork_service + .fork() + .ok_or_else(|| "Fork is unknown, unable to sign".to_string()) + .and_then(|fork| { + service_1 + .validator_store + .randao_reveal( + &validator_pubkey, + slot.epoch(E::slots_per_epoch()), + &fork, + ) + .map(|randao_reveal| (fork, randao_reveal)) + .ok_or_else(|| "Unable to produce randao reveal".to_string()) + }) + .into_future() + .and_then(move |(fork, randao_reveal)| { + service_1 + .beacon_node + .http + .validator() + .produce_block(slot, randao_reveal) + .map(|block| (fork, block)) + .map_err(|e| { + format!( + "Error from beacon node when producing block: {:?}", + e + ) + }) + }) + .and_then(move |(fork, block)| { + service_2 + .validator_store + .sign_block(&validator_pubkey, block, &fork) + .ok_or_else(|| "Unable to sign block".to_string()) + }) + .and_then(move |block| { + service_3 + .beacon_node + .http + .validator() + .publish_block(block) + .map_err(|e| { + format!( + "Error from beacon node when publishing block: {:?}", + e + ) + }) + }) + .map(move |publish_outcome| match publish_outcome { + PublishStatus::Valid => { + info!(log_1, "Successfully published block") + } + PublishStatus::Invalid(msg) => error!( + log_1, + "Published block was invalid"; + "message" => msg + ), + PublishStatus::Unknown => { + info!(log_1, "Unknown condition when publishing block") + } + }) + .map_err(move |e| { + error!( + log_2, + "Error whilst producing block"; + "message" => e + ) + }) + .then(|_| Ok(((), block_producers))) + }) + }) + .collect() + .map(|_| ()) + }) + } +} diff --git a/validator_client/src/duties_service.rs b/validator_client/src/duties_service.rs index 8d3f24b783..5937d66a4f 100644 --- a/validator_client/src/duties_service.rs +++ b/validator_client/src/duties_service.rs @@ -10,7 +10,7 @@ use std::collections::HashMap; use std::sync::Arc; use std::time::{Duration, Instant}; use tokio::timer::Interval; -use types::{ChainSpec, Epoch, EthSpec, PublicKey, Slot}; +use types::{ChainSpec, CommitteeIndex, Epoch, EthSpec, PublicKey, Slot}; /// Delay this period of time after the slot starts. This allows the node to process the new slot. const TIME_DELAY_FROM_SLOT: Duration = Duration::from_millis(100); @@ -36,6 +36,8 @@ impl DutiesStore { // As long as a `HashMap` iterator does not return duplicate keys, neither will this // function. .filter(|(_validator_pubkey, validator_map)| { + // TODO: it would be more efficient to call the `validator_map` by key (epoch) + // instead of searching for slots. validator_map.iter().any(|(_epoch, duties)| { duties .block_proposal_slot @@ -48,6 +50,27 @@ impl DutiesStore { .collect() } + fn attesters(&self, slot: Slot) -> Vec { + self.store + .read() + .iter() + .filter_map(|(_validator_pubkey, validator_map)| { + validator_map + // TODO: it would be more efficient to call the `validator_map` by key (epoch) + // instead of searching for slots. + .iter() + .find(|(_epoch, duties)| { + duties + .attestation_slot + .map(|s| s == slot) + .unwrap_or_else(|| false) + }) + .map(|(_epoch, duties)| duties) + }) + .cloned() + .collect() + } + fn insert(&self, epoch: Epoch, duties: ValidatorDuty) -> InsertOutcome { let mut store = self.store.write(); @@ -177,6 +200,11 @@ impl DutiesService { self.store.block_producers(slot) } + /// Returns all `ValidatorDuty` for the given `slot`. + pub fn attesters(&self, slot: Slot) -> Vec { + self.store.attesters(slot) + } + pub fn start_update_service(&self, spec: &ChainSpec) -> Result { let log = self.context.log.clone(); diff --git a/validator_client/src/lib.rs b/validator_client/src/lib.rs index 6f1c1b3371..587de74e32 100644 --- a/validator_client/src/lib.rs +++ b/validator_client/src/lib.rs @@ -1,3 +1,4 @@ +mod attestation_service; mod block_service; mod cli; mod config;