diff --git a/validator_client/src/attestation_service.rs b/validator_client/src/attestation_service.rs index a7b881b53d..a303e757aa 100644 --- a/validator_client/src/attestation_service.rs +++ b/validator_client/src/attestation_service.rs @@ -3,14 +3,15 @@ use crate::{ }; use environment::RuntimeContext; use exit_future::Signal; -use futures::{stream, Future, IntoFuture, Stream}; -use remote_beacon_node::{PublishStatus, RemoteBeaconNode}; -use slog::{error, info, trace, warn}; +use futures::{Future, Stream}; +use remote_beacon_node::{PublishStatus, RemoteBeaconNode, ValidatorDuty}; +use slog::{error, info, trace}; use slot_clock::SlotClock; +use std::collections::HashMap; use std::sync::Arc; use std::time::{Duration, Instant}; use tokio::timer::Interval; -use types::{ChainSpec, EthSpec}; +use types::{ChainSpec, CommitteeIndex, EthSpec, Fork, Slot}; /// Delay this period of time after the slot starts. This allows the node to process the new slot. const TIME_DELAY_FROM_SLOT: Duration = Duration::from_millis(100); @@ -20,7 +21,7 @@ pub struct AttestationServiceBuilder { fork_service: Option>, duties_service: Option>, validator_store: Option>, - slot_clock: Option>, + slot_clock: Option, beacon_node: Option>, context: Option>, } @@ -54,7 +55,7 @@ impl AttestationServiceBuilder } pub fn slot_clock(mut self, slot_clock: T) -> Self { - self.slot_clock = Some(Arc::new(slot_clock)); + self.slot_clock = Some(slot_clock); self } @@ -70,44 +71,52 @@ impl AttestationServiceBuilder pub fn build(self) -> Result, String> { Ok(AttestationService { - fork_service: self - .fork_service - .ok_or_else(|| "Cannot build AttestationService without fork_service")?, - duties_service: self - .duties_service - .ok_or_else(|| "Cannot build AttestationService without duties_service")?, - validator_store: self - .validator_store - .ok_or_else(|| "Cannot build AttestationService without validator_store")?, - slot_clock: self - .slot_clock - .ok_or_else(|| "Cannot build AttestationService without slot_clock")?, - beacon_node: self - .beacon_node - .ok_or_else(|| "Cannot build AttestationService without beacon_node")?, - context: self - .context - .ok_or_else(|| "Cannot build AttestationService without runtime_context")?, + inner: Arc::new(Inner { + fork_service: self + .fork_service + .ok_or_else(|| "Cannot build AttestationService without fork_service")?, + duties_service: self + .duties_service + .ok_or_else(|| "Cannot build AttestationService without duties_service")?, + validator_store: self + .validator_store + .ok_or_else(|| "Cannot build AttestationService without validator_store")?, + slot_clock: self + .slot_clock + .ok_or_else(|| "Cannot build AttestationService without slot_clock")?, + beacon_node: self + .beacon_node + .ok_or_else(|| "Cannot build AttestationService without beacon_node")?, + context: self + .context + .ok_or_else(|| "Cannot build AttestationService without runtime_context")?, + }), }) } } -#[derive(Clone)] -pub struct AttestationService { +pub struct Inner { duties_service: DutiesService, fork_service: ForkService, validator_store: ValidatorStore, - slot_clock: Arc, + slot_clock: T, beacon_node: RemoteBeaconNode, context: RuntimeContext, } +#[derive(Clone)] +pub struct AttestationService { + inner: Arc>, +} + // TODO: clean trait bounds. impl AttestationService { pub fn start_update_service(&self, spec: &ChainSpec) -> Result { - let log = self.context.log.clone(); + let context = &self.inner.context; + let log = context.log.clone(); let duration_to_next_slot = self + .inner .slot_clock .duration_to_next_slot() .ok_or_else(|| "Unable to determine duration to next slot".to_string())?; @@ -115,7 +124,7 @@ impl AttestationService { let interval = { let slot_duration = Duration::from_millis(spec.milliseconds_per_slot); Interval::new( - Instant::now() + duration_to_next_slot + TIME_DELAY_FROM_SLOT, + Instant::now() + duration_to_next_slot * 3 / 2 + TIME_DELAY_FROM_SLOT, slot_duration, ) }; @@ -128,118 +137,179 @@ impl AttestationService { let (exit_signal, exit_fut) = exit_future::signal(); let service = self.clone(); + let log_1 = log.clone(); + let log_2 = log.clone(); - self.context.executor.spawn( + context.executor.spawn( interval .map_err(move |e| { error! { - log, + log_1, "Timer thread failed"; "error" => format!("{}", e) } }) .and_then(move |_| if exit_fut.is_live() { Ok(()) } else { Err(()) }) - .for_each(move |_| service.clone().do_update()), + .for_each(move |_| { + if let Err(e) = service.clone().spawn_attestation_tasks() { + error!( + log_2, + "Failed to spawn attestation tasks"; + "error" => e + ) + } else { + trace!( + log_2, + "Spawned attestation tasks"; + ) + } + + Ok(()) + }) + // Prevent any errors from escaping and stopping the interval. + .then(|_| Ok(())), ); Ok(exit_signal) } - fn do_update(self) -> impl Future { - let service = self.clone(); - let log = self.context.log.clone(); + fn spawn_attestation_tasks(&self) -> Result<(), String> { + let inner = self.inner.clone(); - self.slot_clock + let slot = inner + .slot_clock .now() - .ok_or_else(move || { - error!(log, "Duties manager failed to read slot clock"); - }) - .into_future() - .and_then(move |slot| { - let iter = service.duties_service.block_producers(slot).into_iter(); + .ok_or_else(|| "Failed to read slot clock".to_string())?; + let fork = inner + .fork_service + .fork() + .ok_or_else(|| "Failed to get Fork".to_string())?; - stream::unfold(iter, move |mut block_producers| { - let log_1 = service.context.log.clone(); - let log_2 = service.context.log.clone(); - let service_1 = service.clone(); - let service_2 = service.clone(); - let service_3 = service.clone(); + let mut committee_indices: HashMap> = HashMap::new(); - block_producers.next().map(move |validator_pubkey| { - service_2 - .fork_service - .fork() - .ok_or_else(|| "Fork is unknown, unable to sign".to_string()) - .and_then(|fork| { - service_1 + inner + .duties_service + .attesters(slot) + .into_iter() + .for_each(|duty| { + if let Some(committee_index) = duty.attestation_committee_index { + let validator_duties = + committee_indices.entry(committee_index).or_insert(vec![]); + + validator_duties.push(duty); + } + }); + + committee_indices + .into_iter() + .for_each(|(committee_index, validator_duties)| { + // Spawn a separate task for each attestation. + inner.context.executor.spawn(self.clone().do_attestation( + slot, + committee_index, + validator_duties, + fork.clone(), + )); + }); + + Ok(()) + } + + fn do_attestation( + self, + slot: Slot, + committee_index: CommitteeIndex, + validator_duties: Vec, + fork: Fork, + ) -> impl Future { + let inner_1 = self.inner.clone(); + let inner_2 = self.inner.clone(); + let log_1 = self.inner.context.log.clone(); + let log_2 = self.inner.context.log.clone(); + + self.inner + .beacon_node + .http + .validator() + .produce_attestation(slot, committee_index) + .map_err(|e| format!("Failed to produce attestation: {:?}", e)) + .and_then(move |attestation| { + validator_duties + .iter() + .try_fold(attestation, |attestation, duty| { + let log = inner_1.context.log.clone(); + + if let Some(( + duty_slot, + duty_committee_index, + validator_committee_position, + )) = attestation_duties(duty) + { + if duty_slot == slot && duty_committee_index == committee_index { + inner_1 .validator_store - .randao_reveal( - &validator_pubkey, - slot.epoch(E::slots_per_epoch()), + .sign_attestation( + &duty.validator_pubkey, + validator_committee_position, + attestation, &fork, ) - .map(|randao_reveal| (fork, randao_reveal)) - .ok_or_else(|| "Unable to produce randao reveal".to_string()) - }) - .into_future() - .and_then(move |(fork, randao_reveal)| { - service_1 - .beacon_node - .http - .validator() - .produce_block(slot, randao_reveal) - .map(|block| (fork, block)) - .map_err(|e| { - format!( - "Error from beacon node when producing block: {:?}", - e - ) - }) - }) - .and_then(move |(fork, block)| { - service_2 - .validator_store - .sign_block(&validator_pubkey, block, &fork) - .ok_or_else(|| "Unable to sign block".to_string()) - }) - .and_then(move |block| { - service_3 - .beacon_node - .http - .validator() - .publish_block(block) - .map_err(|e| { - format!( - "Error from beacon node when publishing block: {:?}", - e - ) - }) - }) - .map(move |publish_outcome| match publish_outcome { - PublishStatus::Valid => { - info!(log_1, "Successfully published block") - } - PublishStatus::Invalid(msg) => error!( - log_1, - "Published block was invalid"; - "message" => msg - ), - PublishStatus::Unknown => { - info!(log_1, "Unknown condition when publishing block") - } - }) - .map_err(move |e| { - error!( - log_2, - "Error whilst producing block"; - "message" => e - ) - }) - .then(|_| Ok(((), block_producers))) + .ok_or_else(|| "Unable to sign attestation".to_string()) + } else { + error!(log, "Inconsistent validator duties during signing"); + + Ok(attestation) + } + } else { + error!(log, "Missing validator duties when signing"); + + Ok(attestation) + } }) - }) - .collect() - .map(|_| ()) + }) + .and_then(move |attestation| { + inner_2 + .beacon_node + .http + .validator() + .publish_attestation(attestation.clone()) + .map(|publish_status| (attestation, publish_status)) + .map_err(|e| format!("Failed to publish attestation: {:?}", e)) + }) + .map(move |(attestation, publish_status)| match publish_status { + PublishStatus::Valid => info!( + log_1, + "Successfully published attestation"; + "signatures" => attestation.aggregation_bits.num_set_bits(), + "head_block" => format!("{}", attestation.data.beacon_block_root), + "committee_index" => attestation.data.index, + "slot" => attestation.data.slot.as_u64(), + ), + PublishStatus::Invalid(msg) => error!( + log_1, + "Published attestation was invalid"; + "message" => msg, + "committee_index" => attestation.data.index, + "slot" => attestation.data.slot.as_u64(), + ), + PublishStatus::Unknown => { + info!(log_1, "Unknown condition when publishing attestation") + } + }) + .map_err(move |e| { + error!( + log_2, + "Error during attestation production"; + "error" => e + ) }) } } + +pub fn attestation_duties(duty: &ValidatorDuty) -> Option<(Slot, CommitteeIndex, usize)> { + Some(( + duty.attestation_slot?, + duty.attestation_committee_index?, + duty.attestation_committee_position?, + )) +} diff --git a/validator_client/src/block_service.rs b/validator_client/src/block_service.rs index a6e91d4e45..7c74017837 100644 --- a/validator_client/src/block_service.rs +++ b/validator_client/src/block_service.rs @@ -5,7 +5,7 @@ use environment::RuntimeContext; use exit_future::Signal; use futures::{stream, Future, IntoFuture, Stream}; use remote_beacon_node::{PublishStatus, RemoteBeaconNode}; -use slog::{error, info, trace, warn}; +use slog::{error, info}; use slot_clock::SlotClock; use std::sync::Arc; use std::time::{Duration, Instant}; @@ -139,7 +139,9 @@ impl BlockService { } }) .and_then(move |_| if exit_fut.is_live() { Ok(()) } else { Err(()) }) - .for_each(move |_| service.clone().do_update()), + .for_each(move |_| service.clone().do_update()) + // Prevent any errors from escaping and stopping the interval. + .then(|_| Ok(())), ); Ok(exit_signal) @@ -207,7 +209,8 @@ impl BlockService { .beacon_node .http .validator() - .publish_block(block) + .publish_block(block.clone()) + .map(|publish_status| (block, publish_status)) .map_err(|e| { format!( "Error from beacon node when publishing block: {:?}", @@ -215,14 +218,19 @@ impl BlockService { ) }) }) - .map(move |publish_outcome| match publish_outcome { - PublishStatus::Valid => { - info!(log_1, "Successfully published block") - } + .map(move |(block, publish_status)| match publish_status { + PublishStatus::Valid => info!( + log_1, + "Successfully published block"; + "deposits" => block.body.deposits.len(), + "attestations" => block.body.attestations.len(), + "slot" => block.slot.as_u64(), + ), PublishStatus::Invalid(msg) => error!( log_1, "Published block was invalid"; - "message" => msg + "message" => msg, + "slot" => block.slot.as_u64(), ), PublishStatus::Unknown => { info!(log_1, "Unknown condition when publishing block") diff --git a/validator_client/src/duties_service.rs b/validator_client/src/duties_service.rs index 5937d66a4f..683053b8d8 100644 --- a/validator_client/src/duties_service.rs +++ b/validator_client/src/duties_service.rs @@ -29,43 +29,42 @@ pub struct DutiesStore { } impl DutiesStore { - fn block_producers(&self, slot: Slot) -> Vec { + fn block_producers(&self, slot: Slot, slots_per_epoch: u64) -> Vec { self.store .read() .iter() // As long as a `HashMap` iterator does not return duplicate keys, neither will this // function. - .filter(|(_validator_pubkey, validator_map)| { - // TODO: it would be more efficient to call the `validator_map` by key (epoch) - // instead of searching for slots. - validator_map.iter().any(|(_epoch, duties)| { - duties - .block_proposal_slot - .map(|proposal_slot| proposal_slot == slot) - .unwrap_or_else(|| false) + .filter_map(|(_validator_pubkey, validator_map)| { + let epoch = slot.epoch(slots_per_epoch); + + validator_map.get(&epoch).and_then(|duties| { + if duties.block_proposal_slot == Some(slot) { + Some(duties.validator_pubkey.clone()) + } else { + None + } }) }) - .map(|(validator_pubkey, _validator_map)| validator_pubkey) - .cloned() .collect() } - fn attesters(&self, slot: Slot) -> Vec { + fn attesters(&self, slot: Slot, slots_per_epoch: u64) -> Vec { self.store .read() .iter() + // As long as a `HashMap` iterator does not return duplicate keys, neither will this + // function. .filter_map(|(_validator_pubkey, validator_map)| { - validator_map - // TODO: it would be more efficient to call the `validator_map` by key (epoch) - // instead of searching for slots. - .iter() - .find(|(_epoch, duties)| { - duties - .attestation_slot - .map(|s| s == slot) - .unwrap_or_else(|| false) - }) - .map(|(_epoch, duties)| duties) + let epoch = slot.epoch(slots_per_epoch); + + validator_map.get(&epoch).and_then(|duties| { + if duties.attestation_slot == Some(slot) { + Some(duties) + } else { + None + } + }) }) .cloned() .collect() @@ -197,12 +196,12 @@ impl DutiesService { /// It is possible that multiple validators have an identical proposal slot, however that is /// likely the result of heavy forking (lol) or inconsistent beacon node connections. pub fn block_producers(&self, slot: Slot) -> Vec { - self.store.block_producers(slot) + self.store.block_producers(slot, E::slots_per_epoch()) } /// Returns all `ValidatorDuty` for the given `slot`. pub fn attesters(&self, slot: Slot) -> Vec { - self.store.attesters(slot) + self.store.attesters(slot, E::slots_per_epoch()) } pub fn start_update_service(&self, spec: &ChainSpec) -> Result { @@ -243,7 +242,9 @@ impl DutiesService { } }) .and_then(move |_| if exit_fut.is_live() { Ok(()) } else { Err(()) }) - .for_each(move |_| service.clone().do_update()), + .for_each(move |_| service.clone().do_update()) + // Prevent any errors from escaping and stopping the interval. + .then(|_| Ok(())), ); Ok(exit_signal) @@ -284,9 +285,6 @@ impl DutiesService { }) }) .map(|_| ()) - // Returning an error will stop the interval. This is not desired, a single failure - // should not stop all future attempts. - .then(|_| Ok(())) } fn update_epoch(self, epoch: Epoch) -> impl Future { diff --git a/validator_client/src/fork_service.rs b/validator_client/src/fork_service.rs index 823b948396..42a3a31784 100644 --- a/validator_client/src/fork_service.rs +++ b/validator_client/src/fork_service.rs @@ -125,7 +125,9 @@ impl ForkService { } }) .and_then(move |_| if exit_fut.is_live() { Ok(()) } else { Err(()) }) - .for_each(move |_| service.clone().do_update()), + .for_each(move |_| service.clone().do_update()) + // Prevent any errors from escaping and stopping the interval. + .then(|_| Ok(())), ); Ok(exit_signal) diff --git a/validator_client/src/lib.rs b/validator_client/src/lib.rs index 587de74e32..88b1afc3d7 100644 --- a/validator_client/src/lib.rs +++ b/validator_client/src/lib.rs @@ -11,6 +11,7 @@ pub mod validator_directory; pub use cli::cli_app; pub use config::Config; +use attestation_service::{AttestationService, AttestationServiceBuilder}; use block_service::{BlockService, BlockServiceBuilder}; use clap::ArgMatches; use config::{Config as ClientConfig, KeySource}; @@ -38,6 +39,7 @@ pub struct ProductionValidatorClient { duties_service: DutiesService, fork_service: ForkService, block_service: BlockService, + attestation_service: AttestationService, exit_signals: Arc>>, } @@ -158,12 +160,21 @@ impl ProductionValidatorClient { .build()?; let block_service = BlockServiceBuilder::new() + .duties_service(duties_service.clone()) + .fork_service(fork_service.clone()) + .slot_clock(slot_clock.clone()) + .validator_store(validator_store.clone()) + .beacon_node(beacon_node.clone()) + .runtime_context(context.service_context("block")) + .build()?; + + let attestation_service = AttestationServiceBuilder::new() .duties_service(duties_service.clone()) .fork_service(fork_service.clone()) .slot_clock(slot_clock) .validator_store(validator_store) .beacon_node(beacon_node) - .runtime_context(context.service_context("block")) + .runtime_context(context.service_context("attestation")) .build()?; Ok(Self { @@ -171,6 +182,7 @@ impl ProductionValidatorClient { duties_service, fork_service, block_service, + attestation_service, exit_signals: Arc::new(RwLock::new(vec![])), }) }) @@ -198,6 +210,13 @@ impl ProductionValidatorClient { self.exit_signals.write().push(block_exit); + let attestation_exit = self + .attestation_service + .start_update_service(&self.context.eth2_config.spec) + .map_err(|e| format!("Unable to start attestation service: {}", e))?; + + self.exit_signals.write().push(attestation_exit); + Ok(()) } } diff --git a/validator_client/src/validator_store.rs b/validator_client/src/validator_store.rs index 81feb4af73..ca647a7c45 100644 --- a/validator_client/src/validator_store.rs +++ b/validator_client/src/validator_store.rs @@ -8,12 +8,15 @@ use std::marker::PhantomData; use std::path::PathBuf; use std::sync::Arc; use tree_hash::{SignedRoot, TreeHash}; -use types::{BeaconBlock, ChainSpec, Domain, Epoch, EthSpec, Fork, PublicKey, Signature}; +use types::{ + Attestation, BeaconBlock, ChainSpec, Domain, Epoch, EthSpec, Fork, PublicKey, Signature, +}; #[derive(Clone)] pub struct ValidatorStore { validators: Arc>>, spec: Arc, + log: Logger, _phantom: PhantomData, } @@ -51,6 +54,7 @@ impl ValidatorStore { Ok(Self { validators: Arc::new(RwLock::new(HashMap::from_iter(validator_iter))), spec: Arc::new(spec), + log, _phantom: PhantomData, }) } @@ -106,4 +110,40 @@ impl ValidatorStore { }) }) } + + pub fn sign_attestation( + &self, + validator_pubkey: &PublicKey, + validator_committee_position: usize, + mut attestation: Attestation, + fork: &Fork, + ) -> Option> { + // TODO: check for slashing. + self.validators + .read() + .get(validator_pubkey) + .and_then(|validator_dir| { + validator_dir + .voting_keypair + .as_ref() + .and_then(|voting_keypair| { + attestation + .sign( + &voting_keypair.sk, + validator_committee_position, + fork, + &self.spec, + ) + .map_err(|e| { + error!( + self.log, + "Error whilst signing attestation"; + "error" => format!("{:?}", e) + ) + }) + .map(|()| attestation) + .ok() + }) + }) + } }