From c99b134e0fc18a45dfe51cb6be5094e59b7b91c2 Mon Sep 17 00:00:00 2001 From: Pawan Dhananjay Date: Thu, 7 May 2020 10:29:26 +0530 Subject: [PATCH] Port validator_client to stable futures (#1114) * Add PH & MS slot clock changes * Account for genesis time * Add progress on duties refactor * Add simple is_aggregator bool to val subscription * Start work on attestation_verification.rs * Add progress on ObservedAttestations * Progress with ObservedAttestations * Fix tests * Add observed attestations to the beacon chain * Add attestation observation to processing code * Add progress on attestation verification * Add first draft of ObservedAttesters * Add more tests * Add observed attesters to beacon chain * Add observers to attestation processing * Add more attestation verification * Create ObservedAggregators map * Remove commented-out code * Add observed aggregators into chain * Add progress * Finish adding features to attestation verification * Ensure beacon chain compiles * Link attn verification into chain * Integrate new attn verification in chain * Remove old attestation processing code * Start trying to fix beacon_chain tests * Split adding into pools into two functions * Add aggregation to harness * Get test harness working again * Adjust the number of aggregators for test harness * Fix edge-case in harness * Integrate new attn processing in network * Fix compile bug in validator_client * Update validator API endpoints * Fix aggreagation in test harness * Fix enum thing * Fix attestation observation bug: * Patch failing API tests * Start adding comments to attestation verification * Remove unused attestation field * Unify "is block known" logic * Update comments * Supress fork choice errors for network processing * Add todos * Tidy * Add gossip attn tests * Disallow test harness to produce old attns * Comment out in-progress tests * Partially address pruning tests * Fix failing store test * Add aggregate tests * Add comments about which spec conditions we check * Dont re-aggregate * Split apart test harness attn production * Fix compile error in network * Make progress on commented-out test * Fix skipping attestation test * Add fork choice verification tests * Tidy attn tests, remove dead code * Remove some accidentally added code * Fix clippy lint * Rename test file * Add block tests, add cheap block proposer check * Rename block testing file * Add observed_block_producers * Tidy * Switch around block signature verification * Finish block testing * Remove gossip from signature tests * First pass of self review * Fix deviation in spec * Update test spec tags * Start moving over to hashset * Finish moving observed attesters to hashmap * Move aggregation pool over to hashmap * Make fc attn borrow again * Fix rest_api compile error * Fix missing comments * Fix monster test * Uncomment increasing slots test * Address remaining comments * Remove unsafe, use cfg test * Remove cfg test flag * Fix dodgy comment * Revert "Update hashmap hashset to stable futures" This reverts commit d432378a3cc5cd67fc29c0b15b96b886c1323554. * Revert "Adds panic test to hashset delay" This reverts commit 281502396fc5b90d9c421a309c2c056982c9525b. * Ported attestation_service * Ported duties_service * Ported fork_service * More ports * Port block_service * Minor fixes * VC compiles * Update TODOS * Borrow self where possible * Ignore aggregates that are already known. * Unify aggregator modulo logic * Fix typo in logs * Refactor validator subscription logic * Avoid reproducing selection proof * Skip HTTP call if no subscriptions * Rename DutyAndState -> DutyAndProof * Tidy logs * Print root as dbg * Fix compile errors in tests * Fix compile error in test * Re-Fix attestation and duties service * Minor fixes Co-authored-by: Paul Hauner --- Cargo.lock | 1 - validator_client/Cargo.toml | 5 +- validator_client/src/attestation_service.rs | 593 ++++++++++---------- validator_client/src/block_service.rs | 211 +++---- validator_client/src/duties_service.rs | 403 +++++++------ validator_client/src/fork_service.rs | 62 +- validator_client/src/lib.rs | 383 ++++++------- validator_client/src/notifier.rs | 100 ++-- validator_client/src/validator_directory.rs | 38 +- 9 files changed, 826 insertions(+), 970 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d5a112423f..84889b41ea 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5253,7 +5253,6 @@ dependencies = [ "slot_clock", "tempdir", "tokio 0.2.20", - "tokio-timer 0.2.13", "tree_hash", "types", "web3", diff --git a/validator_client/Cargo.toml b/validator_client/Cargo.toml index fd29064391..657726adda 100644 --- a/validator_client/Cargo.toml +++ b/validator_client/Cargo.toml @@ -23,11 +23,10 @@ serde_json = "1.0.52" slog = { version = "2.5.2", features = ["max_level_trace", "release_max_level_trace"] } slog-async = "2.5.0" slog-term = "2.5.0" -tokio = "0.2.20" -tokio-timer = "0.2.13" +tokio = {version = "0.2.20", features = ["time"]} error-chain = "0.12.2" bincode = "1.2.1" -futures = "0.3.4" +futures = {version ="0.3.4", features = ["compat"]} dirs = "2.0.2" logging = { path = "../eth2/utils/logging" } environment = { path = "../lighthouse/environment" } diff --git a/validator_client/src/attestation_service.rs b/validator_client/src/attestation_service.rs index 82e1a5745a..5ef22b6c07 100644 --- a/validator_client/src/attestation_service.rs +++ b/validator_client/src/attestation_service.rs @@ -4,15 +4,14 @@ use crate::{ }; use environment::RuntimeContext; use exit_future::Signal; -use futures::{future, Future, Stream}; +use futures::{FutureExt, StreamExt}; use remote_beacon_node::{PublishStatus, RemoteBeaconNode}; use slog::{crit, debug, info, trace}; use slot_clock::SlotClock; use std::collections::HashMap; use std::ops::Deref; use std::sync::Arc; -use std::time::{Duration, Instant}; -use tokio::timer::{Delay, Interval}; +use tokio::time::{delay_until, interval_at, Duration, Instant}; use types::{Attestation, ChainSpec, CommitteeIndex, EthSpec, Slot}; /// Builds an `AttestationService`. @@ -130,7 +129,8 @@ impl AttestationService { .ok_or_else(|| "Unable to determine duration to next slot".to_string())?; let interval = { - Interval::new( + // Note: `interval_at` panics if `slot_duration` is 0 + interval_at( Instant::now() + duration_to_next_slot + slot_duration / 3, slot_duration, ) @@ -140,38 +140,28 @@ impl AttestationService { let service = self.clone(); let log_1 = log.clone(); let log_2 = log.clone(); - let log_3 = log.clone(); - context.executor.spawn( - exit_fut - .until( - interval - .map_err(move |e| { - crit! { - log_1, - "Timer thread failed"; - "error" => format!("{}", e) - } - }) - .for_each(move |_| { - if let Err(e) = service.spawn_attestation_tasks(slot_duration) { - crit!( - log_2, - "Failed to spawn attestation tasks"; - "error" => e - ) - } else { - trace!( - log_2, - "Spawned attestation tasks"; - ) - } - - Ok(()) - }), + let interval_fut = interval.for_each(move |_| { + if let Err(e) = service.spawn_attestation_tasks(slot_duration) { + crit!( + log_1, + "Failed to spawn attestation tasks"; + "error" => e ) - .map(move |_| info!(log_3, "Shutdown complete")), + } else { + trace!( + log_1, + "Spawned attestation tasks"; + ) + } + futures::future::ready(()) + }); + + let future = futures::future::select( + interval_fut, + exit_fut.map(move |_| info!(log_2, "Shutdown complete")), ); + tokio::task::spawn(future); Ok(exit_signal) } @@ -181,11 +171,11 @@ impl AttestationService { fn spawn_attestation_tasks(&self, slot_duration: Duration) -> Result<(), String> { let service = self.clone(); - let slot = service + let slot = self .slot_clock .now() .ok_or_else(|| "Failed to read slot clock".to_string())?; - let duration_to_next_slot = service + let duration_to_next_slot = self .slot_clock .duration_to_next_slot() .ok_or_else(|| "Unable to determine duration to next slot".to_string())?; @@ -197,7 +187,7 @@ impl AttestationService { .checked_sub(slot_duration / 3) .unwrap_or_else(|| Duration::from_secs(0)); - let duties_by_committee_index: HashMap> = service + let duties_by_committee_index: HashMap> = self .duties_service .attesters(slot) .into_iter() @@ -219,15 +209,12 @@ impl AttestationService { .into_iter() .for_each(|(committee_index, validator_duties)| { // Spawn a separate task for each attestation. - service - .context - .executor - .spawn(self.clone().publish_attestations_and_aggregates( - slot, - committee_index, - validator_duties, - aggregate_production_instant, - )); + tokio::task::spawn(service.clone().publish_attestations_and_aggregates( + slot, + committee_index, + validator_duties, + aggregate_production_instant, + )); }); Ok(()) @@ -242,75 +229,68 @@ impl AttestationService { /// /// The given `validator_duties` should already be filtered to only contain those that match /// `slot` and `committee_index`. Critical errors will be logged if this is not the case. - fn publish_attestations_and_aggregates( - &self, + async fn publish_attestations_and_aggregates( + self, slot: Slot, committee_index: CommitteeIndex, validator_duties: Vec, aggregate_production_instant: Instant, - ) -> Box + Send> { + ) -> Result<(), ()> { // There's not need to produce `Attestation` or `SignedAggregateAndProof` if we do not have // any validators for the given `slot` and `committee_index`. if validator_duties.is_empty() { - return Box::new(future::ok(())); + return Ok(()); } - let service_1 = self.clone(); let log_1 = self.context.log.clone(); + let log_2 = self.context.log.clone(); let validator_duties_1 = Arc::new(validator_duties); let validator_duties_2 = validator_duties_1.clone(); - Box::new( - // Step 1. - // - // Download, sign and publish an `Attestation` for each validator. - self.produce_and_publish_attestations(slot, committee_index, validator_duties_1) - .and_then::<_, Box + Send>>( - move |attestation_opt| { - if let Some(attestation) = attestation_opt { - Box::new( - // Step 2. (Only if step 1 produced an attestation) - // - // First, wait until the `aggregation_production_instant` (2/3rds - // of the way though the slot). As verified in the - // `delay_triggers_when_in_the_past` test, this code will still run - // even if the instant has already elapsed. - // - // Then download, sign and publish a `SignedAggregateAndProof` for each - // validator that is elected to aggregate for this `slot` and - // `committee_index`. - Delay::new(aggregate_production_instant) - .map_err(|e| { - format!( - "Unable to create aggregate production delay: {:?}", - e - ) - }) - .and_then(move |()| { - service_1.produce_and_publish_aggregates( - attestation, - validator_duties_2, - ) - }), - ) - } else { - // If `produce_and_publish_attestations` did not download any - // attestations then there is no need to produce any - // `SignedAggregateAndProof`. - Box::new(future::ok(())) - } - }, + // Step 1. + // + // Download, sign and publish an `Attestation` for each validator. + let attestation_opt = self + .produce_and_publish_attestations(slot, committee_index, validator_duties_1) + .await + .map_err(move |e| { + crit!( + log_1, + "Error during attestation routine"; + "error" => format!("{:?}", e), + "committee_index" => committee_index, + "slot" => slot.as_u64(), ) - .map_err(move |e| { - crit!( - log_1, - "Error during attestation routine"; - "error" => format!("{:?}", e), - "committee_index" => committee_index, - "slot" => slot.as_u64(), - ) - }), - ) + })?; + if let Some(attestation) = attestation_opt { + // Step 2. (Only if step 1 produced an attestation) + // + // First, wait until the `aggregation_production_instant` (2/3rds + // of the way though the slot). As verified in the + // `delay_triggers_when_in_the_past` test, this code will still run + // even if the instant has already elapsed. + // + // Then download, sign and publish a `SignedAggregateAndProof` for each + // validator that is elected to aggregate for this `slot` and + // `committee_index`. + delay_until(aggregate_production_instant).await; + self.produce_and_publish_aggregates(attestation, validator_duties_2) + .await + } else { + // If `produce_and_publish_attestations` did not download any + // attestations then there is no need to produce any + // `SignedAggregateAndProof`. + Ok(()) + } + .map_err(move |e| { + crit!( + log_2, + "Error during attestation routine"; + "error" => format!("{:?}", e), + "committee_index" => committee_index, + "slot" => slot.as_u64(), + ) + }) } /// Performs the first step of the attesting process: downloading `Attestation` objects, @@ -325,139 +305,132 @@ impl AttestationService { /// /// Only one `Attestation` is downloaded from the BN. It is then cloned and signed by each /// validator and the list of individually-signed `Attestation` objects is returned to the BN. - fn produce_and_publish_attestations( + async fn produce_and_publish_attestations( &self, slot: Slot, committee_index: CommitteeIndex, validator_duties: Arc>, - ) -> Box>, Error = String> + Send> { + ) -> Result>, String> { if validator_duties.is_empty() { - return Box::new(future::ok(None)); + return Ok(None); } - let service = self.clone(); + let attestation = self + .beacon_node + .http + .validator() + .produce_attestation(slot, committee_index) + .await + .map_err(|e| format!("Failed to produce attestation: {:?}", e))?; + + let log = self.context.log.clone(); + + // For each validator in `validator_duties`, clone the `attestation` and add + // their signature. + // + // If any validator is unable to sign, they are simply skipped. + let signed_attestations = validator_duties + .iter() + .filter_map(|duty| { + let log = self.context.log.clone(); + // Ensure that all required fields are present in the validator duty. + let (duty_slot, duty_committee_index, validator_committee_position, _) = + if let Some(tuple) = duty.attestation_duties() { + tuple + } else { + crit!( + log, + "Missing validator duties when signing"; + "duties" => format!("{:?}", duty) + ); + return None; + }; + + // Ensure that the attestation matches the duties. + if duty_slot != attestation.data.slot + || duty_committee_index != attestation.data.index + { + crit!( + log, + "Inconsistent validator duties during signing"; + "validator" => format!("{:?}", duty.validator_pubkey()), + "duty_slot" => duty_slot, + "attestation_slot" => attestation.data.slot, + "duty_index" => duty_committee_index, + "attestation_index" => attestation.data.index, + ); + return None; + } + + let mut attestation = attestation.clone(); + + if self + .validator_store + .sign_attestation( + duty.validator_pubkey(), + validator_committee_position, + &mut attestation, + ) + .is_none() + { + crit!( + log, + "Attestation signing refused"; + "validator" => format!("{:?}", duty.validator_pubkey()), + "slot" => attestation.data.slot, + "index" => attestation.data.index, + ); + None + } else { + Some(attestation) + } + }) + .collect::>(); + + // If there are any signed attestations, publish them to the BN. Otherwise, + // just return early. + if let Some(attestation) = signed_attestations.first().cloned() { + let num_attestations = signed_attestations.len(); + let beacon_block_root = attestation.data.beacon_block_root; - Box::new( self.beacon_node .http .validator() - .produce_attestation(slot, committee_index) - .map_err(|e| format!("Failed to produce attestation: {:?}", e)) - .and_then::<_, Box + Send>>(move |attestation| { - let log = service.context.log.clone(); - - // For each validator in `validator_duties`, clone the `attestation` and add - // their signature. - // - // If any validator is unable to sign, they are simply skipped. - let signed_attestations = validator_duties - .iter() - .filter_map(|duty| { - let log = service.context.log.clone(); - - // Ensure that all required fields are present in the validator duty. - let (duty_slot, duty_committee_index, validator_committee_position, _) = - if let Some(tuple) = duty.attestation_duties() { - tuple - } else { - crit!( - log, - "Missing validator duties when signing"; - "duties" => format!("{:?}", duty) - ); - return None; - }; - - // Ensure that the attestation matches the duties. - if duty_slot != attestation.data.slot - || duty_committee_index != attestation.data.index - { - crit!( - log, - "Inconsistent validator duties during signing"; - "validator" => format!("{:?}", duty.validator_pubkey()), - "duty_slot" => duty_slot, - "attestation_slot" => attestation.data.slot, - "duty_index" => duty_committee_index, - "attestation_index" => attestation.data.index, - ); - return None; - } - - let mut attestation = attestation.clone(); - - if service - .validator_store - .sign_attestation( - duty.validator_pubkey(), - validator_committee_position, - &mut attestation, - ) - .is_none() - { - crit!( - log, - "Attestation signing refused"; - "validator" => format!("{:?}", duty.validator_pubkey()), - "slot" => attestation.data.slot, - "index" => attestation.data.index, - ); - None - } else { - Some(attestation) - } - }) - .collect::>(); - - // If there are any signed attestations, publish them to the BN. Otherwise, - // just return early. - if let Some(attestation) = signed_attestations.first().cloned() { - let num_attestations = signed_attestations.len(); - let beacon_block_root = attestation.data.beacon_block_root; - - Box::new( - service - .beacon_node - .http - .validator() - .publish_attestations(signed_attestations) - .map_err(|e| format!("Failed to publish attestation: {:?}", e)) - .map(move |publish_status| match publish_status { - PublishStatus::Valid => info!( - log, - "Successfully published attestations"; - "count" => num_attestations, - "head_block" => format!("{:?}", beacon_block_root), - "committee_index" => committee_index, - "slot" => slot.as_u64(), - "type" => "unaggregated", - ), - PublishStatus::Invalid(msg) => crit!( - log, - "Published attestation was invalid"; - "message" => msg, - "committee_index" => committee_index, - "slot" => slot.as_u64(), - "type" => "unaggregated", - ), - PublishStatus::Unknown => crit!( - log, - "Unknown condition when publishing unagg. attestation" - ), - }) - .map(|()| Some(attestation)), - ) - } else { - debug!( - log, - "No attestations to publish"; - "committee_index" => committee_index, - "slot" => slot.as_u64(), - ); - Box::new(future::ok(None)) + .publish_attestations(signed_attestations) + .await + .map_err(|e| format!("Failed to publish attestation: {:?}", e)) + .map(move |publish_status| match publish_status { + PublishStatus::Valid => info!( + log, + "Successfully published attestations"; + "count" => num_attestations, + "head_block" => format!("{:?}", beacon_block_root), + "committee_index" => committee_index, + "slot" => slot.as_u64(), + "type" => "unaggregated", + ), + PublishStatus::Invalid(msg) => crit!( + log, + "Published attestation was invalid"; + "message" => msg, + "committee_index" => committee_index, + "slot" => slot.as_u64(), + "type" => "unaggregated", + ), + PublishStatus::Unknown => { + crit!(log, "Unknown condition when publishing unagg. attestation") } - }), - ) + }) + .map(|()| Some(attestation)) + } else { + debug!( + log, + "No attestations to publish"; + "committee_index" => committee_index, + "slot" => slot.as_u64(), + ); + return Ok(None); + } } /// Performs the second step of the attesting process: downloading an aggregated `Attestation`, @@ -473,107 +446,105 @@ impl AttestationService { /// Only one aggregated `Attestation` is downloaded from the BN. It is then cloned and signed /// by each validator and the list of individually-signed `SignedAggregateAndProof` objects is /// returned to the BN. - fn produce_and_publish_aggregates( + async fn produce_and_publish_aggregates( &self, attestation: Attestation, validator_duties: Arc>, - ) -> impl Future { - let service_1 = self.clone(); + ) -> Result<(), String> { let log_1 = self.context.log.clone(); - self.beacon_node + let aggregated_attestation = self + .beacon_node .http .validator() .produce_aggregate_attestation(&attestation.data) - .map_err(|e| format!("Failed to produce an aggregate attestation: {:?}", e)) - .and_then::<_, Box + Send>>( - move |aggregated_attestation| { - // For each validator, clone the `aggregated_attestation` and convert it into - // a `SignedAggregateAndProof` - let signed_aggregate_and_proofs = validator_duties - .iter() - .filter_map(|duty_and_proof| { - // Do not produce a signed aggregator for validators that are not - // subscribed aggregators. - let selection_proof = duty_and_proof.selection_proof.as_ref()?.clone(); + .await + .map_err(|e| format!("Failed to produce an aggregate attestation: {:?}", e))?; - let (duty_slot, duty_committee_index, _, validator_index) = - duty_and_proof.attestation_duties().or_else(|| { - crit!(log_1, "Missing duties when signing aggregate"); - None - })?; + // For each validator, clone the `aggregated_attestation` and convert it into + // a `SignedAggregateAndProof` + let signed_aggregate_and_proofs = validator_duties + .iter() + .filter_map(|duty_and_proof| { + // Do not produce a signed aggregator for validators that are not + // subscribed aggregators. + let selection_proof = duty_and_proof.selection_proof.as_ref()?.clone(); - let pubkey = &duty_and_proof.duty.validator_pubkey; - let slot = attestation.data.slot; - let committee_index = attestation.data.index; + let (duty_slot, duty_committee_index, _, validator_index) = + duty_and_proof.attestation_duties().or_else(|| { + crit!(log_1, "Missing duties when signing aggregate"); + None + })?; - if duty_slot != slot || duty_committee_index != committee_index { - crit!(log_1, "Inconsistent validator duties during signing"); - return None; - } + let pubkey = &duty_and_proof.duty.validator_pubkey; + let slot = attestation.data.slot; + let committee_index = attestation.data.index; - if let Some(signed_aggregate_and_proof) = service_1 - .validator_store - .produce_signed_aggregate_and_proof( - pubkey, - validator_index, - aggregated_attestation.clone(), - selection_proof, - ) - { - Some(signed_aggregate_and_proof) - } else { - crit!(log_1, "Failed to sign attestation"); - None - } - }) - .collect::>(); + if duty_slot != slot || duty_committee_index != committee_index { + crit!(log_1, "Inconsistent validator duties during signing"); + return None; + } - // If there any signed aggregates and proofs were produced, publish them to the - // BN. - if let Some(first) = signed_aggregate_and_proofs.first().cloned() { - let attestation = first.message.aggregate; + if let Some(signed_aggregate_and_proof) = + self.validator_store.produce_signed_aggregate_and_proof( + pubkey, + validator_index, + aggregated_attestation.clone(), + selection_proof, + ) + { + Some(signed_aggregate_and_proof) + } else { + crit!(log_1, "Failed to sign attestation"); + None + } + }) + .collect::>(); - Box::new(service_1 - .beacon_node - .http - .validator() - .publish_aggregate_and_proof(signed_aggregate_and_proofs) - .map(|publish_status| (attestation, publish_status)) - .map_err(|e| format!("Failed to publish aggregate and proofs: {:?}", e)) - .map(move |(attestation, publish_status)| match publish_status { - PublishStatus::Valid => info!( - log_1, - "Successfully published attestations"; - "signatures" => attestation.aggregation_bits.num_set_bits(), - "head_block" => format!("{:?}", attestation.data.beacon_block_root), - "committee_index" => attestation.data.index, - "slot" => attestation.data.slot.as_u64(), - "type" => "aggregated", - ), - PublishStatus::Invalid(msg) => crit!( - log_1, - "Published attestation was invalid"; - "message" => msg, - "committee_index" => attestation.data.index, - "slot" => attestation.data.slot.as_u64(), - "type" => "aggregated", - ), - PublishStatus::Unknown => { - crit!(log_1, "Unknown condition when publishing agg. attestation") - } - })) - } else { - debug!( - log_1, - "No signed aggregates to publish"; - "committee_index" => attestation.data.index, - "slot" => attestation.data.slot.as_u64(), - ); - Box::new(future::ok(())) - } - }, - ) + // If there any signed aggregates and proofs were produced, publish them to the + // BN. + if let Some(first) = signed_aggregate_and_proofs.first().cloned() { + let attestation = first.message.aggregate; + + let publish_status = self + .beacon_node + .http + .validator() + .publish_aggregate_and_proof(signed_aggregate_and_proofs) + .await + .map_err(|e| format!("Failed to publish aggregate and proofs: {:?}", e))?; + match publish_status { + PublishStatus::Valid => info!( + log_1, + "Successfully published attestations"; + "signatures" => attestation.aggregation_bits.num_set_bits(), + "head_block" => format!("{:?}", attestation.data.beacon_block_root), + "committee_index" => attestation.data.index, + "slot" => attestation.data.slot.as_u64(), + "type" => "aggregated", + ), + PublishStatus::Invalid(msg) => crit!( + log_1, + "Published attestation was invalid"; + "message" => msg, + "committee_index" => attestation.data.index, + "slot" => attestation.data.slot.as_u64(), + "type" => "aggregated", + ), + PublishStatus::Unknown => { + crit!(log_1, "Unknown condition when publishing agg. attestation") + } + }; + Ok(()) + } else { + debug!( + log_1, + "No signed aggregates to publish"; + "committee_index" => attestation.data.index, + "slot" => attestation.data.slot.as_u64(), + ); + Ok(()) + } } } @@ -591,16 +562,14 @@ mod tests { let state_1 = Arc::new(RwLock::new(in_the_past)); let state_2 = state_1.clone(); - let future = Delay::new(in_the_past) - .map_err(|_| panic!("Failed to create duration")) - .map(move |()| *state_1.write() = Instant::now()); + let future = delay_until(in_the_past).map(move |()| *state_1.write() = Instant::now()); let mut runtime = RuntimeBuilder::new() .core_threads(1) .build() .expect("failed to start runtime"); - runtime.block_on(future).expect("failed to complete future"); + runtime.block_on(future); assert!( *state_2.read() > in_the_past, diff --git a/validator_client/src/block_service.rs b/validator_client/src/block_service.rs index 14c7043a73..5e9daa6ff2 100644 --- a/validator_client/src/block_service.rs +++ b/validator_client/src/block_service.rs @@ -1,15 +1,14 @@ use crate::{duties_service::DutiesService, validator_store::ValidatorStore}; use environment::RuntimeContext; use exit_future::Signal; -use futures::{stream, Future, IntoFuture, Stream}; +use futures::{FutureExt, StreamExt}; use remote_beacon_node::{PublishStatus, RemoteBeaconNode}; use slog::{crit, error, info, trace}; use slot_clock::SlotClock; use std::ops::Deref; use std::sync::Arc; -use std::time::{Duration, Instant}; -use tokio::timer::Interval; -use types::{ChainSpec, EthSpec}; +use tokio::time::{interval_at, Duration, Instant}; +use types::{ChainSpec, EthSpec, PublicKey, Slot}; /// Delay this period of time after the slot starts. This allows the node to process the new slot. const TIME_DELAY_FROM_SLOT: Duration = Duration::from_millis(100); @@ -124,7 +123,8 @@ impl BlockService { let interval = { let slot_duration = Duration::from_millis(spec.milliseconds_per_slot); - Interval::new( + // Note: interval_at panics if slot_duration = 0 + interval_at( Instant::now() + duration_to_next_slot + TIME_DELAY_FROM_SLOT, slot_duration, ) @@ -132,135 +132,102 @@ impl BlockService { let (exit_signal, exit_fut) = exit_future::signal(); let service = self.clone(); - let log_1 = log.clone(); - let log_2 = log.clone(); + let interval_fut = interval.for_each(move |_| { + let _ = service.clone().do_update(); + futures::future::ready(()) + }); - self.context.executor.spawn( - exit_fut - .until( - interval - .map_err(move |e| { - crit! { - log_1, - "Timer thread failed"; - "error" => format!("{}", e) - } - }) - .for_each(move |_| service.clone().do_update().then(|_| Ok(()))), - ) - .map(move |_| info!(log_2, "Shutdown complete")), + let future = futures::future::select( + interval_fut, + exit_fut.map(move |_| info!(log, "Shutdown complete")), ); + tokio::task::spawn(future); Ok(exit_signal) } /// Attempt to produce a block for any block producers in the `ValidatorStore`. - fn do_update(self) -> impl Future { - let service = self.clone(); + fn do_update(&self) -> Result<(), ()> { let log_1 = self.context.log.clone(); let log_2 = self.context.log.clone(); - self.slot_clock - .now() - .ok_or_else(move || { - crit!(log_1, "Duties manager failed to read slot clock"); - }) - .into_future() - .and_then(move |slot| { - let iter = service.duties_service.block_producers(slot).into_iter(); + let slot = self.slot_clock.now().ok_or_else(move || { + crit!(log_1, "Duties manager failed to read slot clock"); + })?; + let iter = self.duties_service.block_producers(slot).into_iter(); - if iter.len() == 0 { - trace!( - log_2, - "No local block proposers for this slot"; - "slot" => slot.as_u64() - ) - } else if iter.len() > 1 { - error!( - log_2, - "Multiple block proposers for this slot"; - "action" => "producing blocks for all proposers", - "num_proposers" => iter.len(), - "slot" => slot.as_u64(), - ) - } + if iter.len() == 0 { + trace!( + log_2, + "No local block proposers for this slot"; + "slot" => slot.as_u64() + ) + } else if iter.len() > 1 { + error!( + log_2, + "Multiple block proposers for this slot"; + "action" => "producing blocks for all proposers", + "num_proposers" => iter.len(), + "slot" => slot.as_u64(), + ) + } - stream::unfold(iter, move |mut block_producers| { - let log_1 = service.context.log.clone(); - let log_2 = service.context.log.clone(); - let service_1 = service.clone(); - let service_2 = service.clone(); - let service_3 = service.clone(); + // TODO: check if the logic is same as stream::unfold version + let _ = futures::stream::iter(iter).for_each(|validator_pubkey| async { + match self.publish_block(slot, validator_pubkey).await { + Ok(()) => (), + Err(e) => crit!( + log_2, + "Error whilst producing block"; + "message" => e + ), + } + }); - block_producers.next().map(move |validator_pubkey| { - service_1 - .validator_store - .randao_reveal(&validator_pubkey, slot.epoch(E::slots_per_epoch())) - .ok_or_else(|| "Unable to produce randao reveal".to_string()) - .into_future() - .and_then(move |randao_reveal| { - service_1 - .beacon_node - .http - .validator() - .produce_block(slot, randao_reveal) - .map_err(|e| { - format!( - "Error from beacon node when producing block: {:?}", - e - ) - }) - }) - .and_then(move |block| { - service_2 - .validator_store - .sign_block(&validator_pubkey, block) - .ok_or_else(|| "Unable to sign block".to_string()) - }) - .and_then(move |block| { - service_3 - .beacon_node - .http - .validator() - .publish_block(block.clone()) - .map(|publish_status| (block, publish_status)) - .map_err(|e| { - format!( - "Error from beacon node when publishing block: {:?}", - e - ) - }) - }) - .map(move |(block, publish_status)| match publish_status { - PublishStatus::Valid => info!( - log_1, - "Successfully published block"; - "deposits" => block.message.body.deposits.len(), - "attestations" => block.message.body.attestations.len(), - "slot" => block.slot().as_u64(), - ), - PublishStatus::Invalid(msg) => crit!( - log_1, - "Published block was invalid"; - "message" => msg, - "slot" => block.slot().as_u64(), - ), - PublishStatus::Unknown => { - crit!(log_1, "Unknown condition when publishing block") - } - }) - .map_err(move |e| { - crit!( - log_2, - "Error whilst producing block"; - "message" => e - ) - }) - .then(|_| Ok(((), block_producers))) - }) - }) - .collect() - .map(|_| ()) - }) + Ok(()) + } + + /// Produce a block at the given slot for validator_pubkey + async fn publish_block(&self, slot: Slot, validator_pubkey: PublicKey) -> Result<(), String> { + let log_1 = self.context.log.clone(); + let randao_reveal = self + .validator_store + .randao_reveal(&validator_pubkey, slot.epoch(E::slots_per_epoch())) + .ok_or_else(|| "Unable to produce randao reveal".to_string())?; + let block = self + .beacon_node + .http + .validator() + .produce_block(slot, randao_reveal) + .await + .map_err(|e| format!("Error from beacon node when producing block: {:?}", e))?; + let signed_block = self + .validator_store + .sign_block(&validator_pubkey, block) + .ok_or_else(|| "Unable to sign block".to_string())?; + let publish_status = self + .beacon_node + .http + .validator() + .publish_block(signed_block.clone()) + .await + .map_err(|e| format!("Error from beacon node when publishing block: {:?}", e))?; + match publish_status { + PublishStatus::Valid => info!( + log_1, + "Successfully published block"; + "deposits" => signed_block.message.body.deposits.len(), + "attestations" => signed_block.message.body.attestations.len(), + "slot" => signed_block.slot().as_u64(), + ), + PublishStatus::Invalid(msg) => crit!( + log_1, + "Published block was invalid"; + "message" => msg, + "slot" => signed_block.slot().as_u64(), + ), + PublishStatus::Unknown => crit!(log_1, "Unknown condition when publishing block"), + } + Ok(()) } } diff --git a/validator_client/src/duties_service.rs b/validator_client/src/duties_service.rs index d14880b88e..e16f45a1fc 100644 --- a/validator_client/src/duties_service.rs +++ b/validator_client/src/duties_service.rs @@ -1,18 +1,17 @@ use crate::validator_store::ValidatorStore; use environment::RuntimeContext; use exit_future::Signal; -use futures::{future, Future, IntoFuture, Stream}; +use futures::{FutureExt, StreamExt}; use parking_lot::RwLock; use remote_beacon_node::{PublishStatus, RemoteBeaconNode}; use rest_types::{ValidatorDuty, ValidatorDutyBytes, ValidatorSubscription}; -use slog::{crit, debug, error, info, trace, warn}; +use slog::{debug, error, info, trace, warn}; use slot_clock::SlotClock; use std::collections::HashMap; use std::convert::TryInto; use std::ops::Deref; use std::sync::Arc; -use std::time::{Duration, Instant}; -use tokio::timer::Interval; +use tokio::time::{interval_at, Duration, Instant}; use types::{ChainSpec, CommitteeIndex, Epoch, EthSpec, PublicKey, SelectionProof, Slot}; /// Delay this period of time after the slot starts. This allows the node to process the new slot. @@ -440,54 +439,53 @@ impl DutiesService { let interval = { let slot_duration = Duration::from_millis(spec.milliseconds_per_slot); - Interval::new( + // Note: `interval_at` panics if `slot_duration` is 0 + interval_at( Instant::now() + duration_to_next_slot + TIME_DELAY_FROM_SLOT, slot_duration, ) }; let (exit_signal, exit_fut) = exit_future::signal(); - let service = self.clone(); - let log_1 = log.clone(); - let log_2 = log.clone(); + let service_1 = self.clone(); + let service_2 = self.clone(); // Run an immediate update before starting the updater service. - self.context.executor.spawn(service.clone().do_update()); + tokio::task::spawn(service_1.do_update()); - self.context.executor.spawn( - exit_fut - .until( - interval - .map_err(move |e| { - crit! { - log_1, - "Timer thread failed"; - "error" => format!("{}", e) - } - }) - .for_each(move |_| service.clone().do_update().then(|_| Ok(()))), - ) - .map(move |_| info!(log_2, "Shutdown complete")), + let interval_fut = interval.for_each(move |_| { + let _ = service_2.clone().do_update(); + futures::future::ready(()) + }); + + let future = futures::future::select( + interval_fut, + exit_fut.map(move |_| info!(log, "Shutdown complete")), ); + tokio::task::spawn(future); Ok(exit_signal) } /// Attempt to download the duties of all managed validators for this epoch and the next. - fn do_update(&self) -> impl Future { + async fn do_update(self) -> Result<(), ()> { + let log_1 = self.context.log.clone(); + let log_2 = self.context.log.clone(); + let log_3 = self.context.log.clone(); + let log = self.context.log.clone(); + let service_1 = self.clone(); let service_2 = self.clone(); let service_3 = self.clone(); let service_4 = self.clone(); - let log_1 = self.context.log.clone(); - let log_2 = self.context.log.clone(); + let service_5 = self.clone(); - self.slot_clock + let current_epoch = service_1 + .slot_clock .now() .ok_or_else(move || { error!(log_1, "Duties manager failed to read slot clock"); }) - .into_future() .map(move |slot| { let epoch = slot.epoch(E::slots_per_epoch()); @@ -501,218 +499,203 @@ impl DutiesService { "current_epoch" => epoch.as_u64(), ); - service_1.store.prune(prune_below); + self.store.prune(prune_below); } epoch - }) - .and_then(move |epoch| { - let log = service_2.context.log.clone(); + })?; - service_2 - .beacon_node - .http - .beacon() - .get_head() - .map(move |head| (epoch, head.slot.epoch(E::slots_per_epoch()))) - .map_err(move |e| { - error!( - log, - "Failed to contact beacon node"; - "error" => format!("{:?}", e) - ) - }) - }) - .and_then(move |(current_epoch, beacon_head_epoch)| { - let log = service_3.context.log.clone(); + let beacon_head_epoch = service_2 + .beacon_node + .http + .beacon() + .get_head() + .await + .map(|head| head.slot.epoch(E::slots_per_epoch())) + .map_err(move |e| { + error!( + log_3, + "Failed to contact beacon node"; + "error" => format!("{:?}", e) + ) + })?; - let future: Box + Send> = if beacon_head_epoch + 1 - < current_epoch - && !service_3.allow_unsynced_beacon_node - { + if beacon_head_epoch + 1 < current_epoch && !service_3.allow_unsynced_beacon_node { + error!( + log, + "Beacon node is not synced"; + "node_head_epoch" => format!("{}", beacon_head_epoch), + "current_epoch" => format!("{}", current_epoch), + ); + } else { + let result = service_4.clone().update_epoch(current_epoch).await; + if let Err(e) = result { + error!( + log, + "Failed to get current epoch duties"; + "http_error" => format!("{:?}", e) + ); + } + + service_5 + .clone() + .update_epoch(current_epoch + 1) + .await + .map_err(move |e| { error!( log, - "Beacon node is not synced"; - "node_head_epoch" => format!("{}", beacon_head_epoch), - "current_epoch" => format!("{}", current_epoch), + "Failed to get next epoch duties"; + "http_error" => format!("{:?}", e) ); - - Box::new(future::ok(())) - } else { - Box::new(service_3.update_epoch(current_epoch).then(move |result| { - if let Err(e) = result { - error!( - log, - "Failed to get current epoch duties"; - "http_error" => format!("{:?}", e) - ); - } - - let log = service_4.context.log.clone(); - service_4.update_epoch(current_epoch + 1).map_err(move |e| { - error!( - log, - "Failed to get next epoch duties"; - "http_error" => format!("{:?}", e) - ); - }) - })) - }; - - future - }) - .map(|_| ()) + })?; + }; + Ok(()) } /// Attempt to download the duties of all managed validators for the given `epoch`. - fn update_epoch(self, epoch: Epoch) -> impl Future { - let service_1 = self.clone(); - let service_2 = self.clone(); - let service_3 = self; - - let pubkeys = service_1.validator_store.voting_pubkeys(); - service_1 + async fn update_epoch(self, epoch: Epoch) -> Result<(), String> { + let pubkeys = self.validator_store.voting_pubkeys(); + let all_duties = self .beacon_node .http .validator() .get_duties(epoch, pubkeys.as_slice()) - .map(move |all_duties| (epoch, all_duties)) - .map_err(move |e| format!("Failed to get duties for epoch {}: {:?}", epoch, e)) - .and_then(move |(epoch, all_duties)| { - let log = service_2.context.log.clone(); + .await + .map_err(move |e| format!("Failed to get duties for epoch {}: {:?}", epoch, e))?; - let mut new_validator = 0; - let mut new_epoch = 0; - let mut identical = 0; - let mut replaced = 0; - let mut invalid = 0; + let log = self.context.log.clone(); - // For each of the duties, attempt to insert them into our local store and build a - // list of new or changed selections proofs for any aggregating validators. - let validator_subscriptions = all_duties.into_iter().filter_map(|remote_duties| { - // Convert the remote duties into our local representation. - let duties: DutyAndProof = remote_duties - .try_into() - .map_err(|e| error!( + let mut new_validator = 0; + let mut new_epoch = 0; + let mut identical = 0; + let mut replaced = 0; + let mut invalid = 0; + + // For each of the duties, attempt to insert them into our local store and build a + // list of new or changed selections proofs for any aggregating validators. + let validator_subscriptions = all_duties + .into_iter() + .filter_map(|remote_duties| { + // Convert the remote duties into our local representation. + let duties: DutyAndProof = remote_duties + .try_into() + .map_err(|e| { + error!( log, "Unable to convert remote duties"; "error" => e - )) - .ok()?; + ) + }) + .ok()?; - // Attempt to update our local store. - let outcome = service_2 - .store - .insert(epoch, duties.clone(), E::slots_per_epoch(), &service_2.validator_store) - .map_err(|e| error!( + // Attempt to update our local store. + let outcome = self + .store + .insert( + epoch, + duties.clone(), + E::slots_per_epoch(), + &self.validator_store, + ) + .map_err(|e| { + error!( log, "Unable to store duties"; "error" => e - )) - .ok()?; + ) + }) + .ok()?; - match &outcome { - InsertOutcome::NewValidator => { - debug!( - log, - "First duty assignment for validator"; - "proposal_slots" => format!("{:?}", &duties.duty.block_proposal_slots), - "attestation_slot" => format!("{:?}", &duties.duty.attestation_slot), - "validator" => format!("{:?}", &duties.duty.validator_pubkey) - ); - new_validator += 1; - } - InsertOutcome::NewEpoch => new_epoch += 1, - InsertOutcome::Identical => identical += 1, - InsertOutcome::Replaced { .. } => replaced += 1, - InsertOutcome::Invalid => invalid += 1, - }; - - if outcome.is_subscription_candidate() { - Some(ValidatorSubscription { - validator_index: duties.duty.validator_index?, - attestation_committee_index: duties.duty.attestation_committee_index?, - slot: duties.duty.attestation_slot?, - is_aggregator: duties.selection_proof.is_some(), - }) - } else { - None + match &outcome { + InsertOutcome::NewValidator => { + debug!( + log, + "First duty assignment for validator"; + "proposal_slots" => format!("{:?}", &duties.duty.block_proposal_slots), + "attestation_slot" => format!("{:?}", &duties.duty.attestation_slot), + "validator" => format!("{:?}", &duties.duty.validator_pubkey) + ); + new_validator += 1; } - }).collect::>(); + InsertOutcome::NewEpoch => new_epoch += 1, + InsertOutcome::Identical => identical += 1, + InsertOutcome::Replaced { .. } => replaced += 1, + InsertOutcome::Invalid => invalid += 1, + }; - if invalid > 0 { - error!( - log, - "Received invalid duties from beacon node"; - "bad_duty_count" => invalid, - "info" => "Duties are from wrong epoch." - ) - } - - trace!( - log, - "Performed duties update"; - "identical" => identical, - "new_epoch" => new_epoch, - "new_validator" => new_validator, - "replaced" => replaced, - "epoch" => format!("{}", epoch) - ); - - if replaced > 0 { - warn!( - log, - "Duties changed during routine update"; - "info" => "Chain re-org likely occurred." - ) - } - - Ok(validator_subscriptions) - }) - .and_then::<_, Box + Send>>(move |validator_subscriptions| { - let log = service_3.context.log.clone(); - let count = validator_subscriptions.len(); - - if count == 0 { - debug!( - log, - "No new subscriptions required" - ); - - Box::new(future::ok(())) + if outcome.is_subscription_candidate() { + Some(ValidatorSubscription { + validator_index: duties.duty.validator_index?, + attestation_committee_index: duties.duty.attestation_committee_index?, + slot: duties.duty.attestation_slot?, + is_aggregator: duties.selection_proof.is_some(), + }) } else { - Box::new(service_3.beacon_node - .http - .validator() - .subscribe(validator_subscriptions) - .map_err(|e| format!("Failed to subscribe validators: {:?}", e)) - .map(move |status| { - match status { - PublishStatus::Valid => { - debug!( - log, - "Successfully subscribed validators"; - "count" => count - ) - }, - PublishStatus::Unknown => { - error!( - log, - "Unknown response from subscription"; - ) - }, - PublishStatus::Invalid(e) => { - error!( - log, - "Failed to subscribe validator"; - "error" => e - ) - }, - }; - })) + None } - }) + .collect::>(); + + if invalid > 0 { + error!( + log, + "Received invalid duties from beacon node"; + "bad_duty_count" => invalid, + "info" => "Duties are from wrong epoch." + ) + } + + trace!( + log, + "Performed duties update"; + "identical" => identical, + "new_epoch" => new_epoch, + "new_validator" => new_validator, + "replaced" => replaced, + "epoch" => format!("{}", epoch) + ); + + if replaced > 0 { + warn!( + log, + "Duties changed during routine update"; + "info" => "Chain re-org likely occurred." + ) + } + + let log = self.context.log.clone(); + let count = validator_subscriptions.len(); + + if count == 0 { + debug!(log, "No new subscriptions required"); + + Ok(()) + } else { + self.beacon_node + .http + .validator() + .subscribe(validator_subscriptions) + .await + .map_err(|e| format!("Failed to subscribe validators: {:?}", e)) + .map(move |status| { + match status { + PublishStatus::Valid => debug!( + log, + "Successfully subscribed validators"; + "count" => count + ), + PublishStatus::Unknown => error!( + log, + "Unknown response from subscription"; + ), + PublishStatus::Invalid(e) => error!( + log, + "Failed to subscribe validator"; + "error" => e + ), + }; + }) + } } } diff --git a/validator_client/src/fork_service.rs b/validator_client/src/fork_service.rs index 9ff3a0bf56..412c0272fe 100644 --- a/validator_client/src/fork_service.rs +++ b/validator_client/src/fork_service.rs @@ -1,14 +1,13 @@ use environment::RuntimeContext; use exit_future::Signal; -use futures::{Future, Stream}; +use futures::{FutureExt, StreamExt}; use parking_lot::RwLock; use remote_beacon_node::RemoteBeaconNode; -use slog::{crit, info, trace}; +use slog::{info, trace}; use slot_clock::SlotClock; use std::ops::Deref; use std::sync::Arc; -use std::time::{Duration, Instant}; -use tokio::timer::Interval; +use tokio::time::{interval_at, Duration, Instant}; use types::{ChainSpec, EthSpec, Fork}; /// Delay this period of time after the slot starts. This allows the node to process the new slot. @@ -111,51 +110,46 @@ impl ForkService { let interval = { let slot_duration = Duration::from_millis(spec.milliseconds_per_slot); - Interval::new( + // Note: interval_at panics if `slot_duration * E::slots_per_epoch()` = 0 + interval_at( Instant::now() + duration_to_next_epoch + TIME_DELAY_FROM_SLOT, slot_duration * E::slots_per_epoch() as u32, ) }; let (exit_signal, exit_fut) = exit_future::signal(); - let service = self.clone(); - let log_1 = log.clone(); - let log_2 = log.clone(); // Run an immediate update before starting the updater service. - self.context.executor.spawn(service.clone().do_update()); + let service_1 = self.clone(); + let service_2 = self.clone(); + tokio::task::spawn(service_1.do_update()); - self.context.executor.spawn( - exit_fut - .until( - interval - .map_err(move |e| { - crit! { - log_1, - "Timer thread failed"; - "error" => format!("{}", e) - } - }) - .for_each(move |_| service.do_update().then(|_| Ok(()))), - ) - .map(move |_| info!(log_2, "Shutdown complete")), + let interval_fut = interval.for_each(move |_| { + let _ = service_2.clone().do_update(); + futures::future::ready(()) + }); + + let future = futures::future::select( + interval_fut, + exit_fut.map(move |_| info!(log, "Shutdown complete")), ); + tokio::task::spawn(future); Ok(exit_signal) } /// Attempts to download the `Fork` from the server. - fn do_update(&self) -> impl Future { - let service_1 = self.clone(); - let log_1 = service_1.context.log.clone(); - let log_2 = service_1.context.log.clone(); - - self.inner + async fn do_update(self) -> Result<(), ()> { + let log_1 = self.context.log.clone(); + let log_2 = self.context.log.clone(); + let _ = self + .inner .beacon_node .http .beacon() .get_fork() - .map(move |fork| *(service_1.fork.write()) = Some(fork)) + .await + .map(move |fork| *(self.fork.write()) = Some(fork)) .map(move |_| trace!(log_1, "Fork update success")) .map_err(move |e| { trace!( @@ -163,9 +157,9 @@ impl ForkService { "Fork update failed"; "error" => format!("Error retrieving fork: {:?}", e) ) - }) - // Returning an error will stop the interval. This is not desired, a single failure - // should not stop all future attempts. - .then(|_| Ok(())) + }); + // Returning an error will stop the interval. This is not desired, a single failure + // should not stop all future attempts. + Ok(()) } } diff --git a/validator_client/src/lib.rs b/validator_client/src/lib.rs index ec7e2a743d..d4332dcbcf 100644 --- a/validator_client/src/lib.rs +++ b/validator_client/src/lib.rs @@ -19,18 +19,13 @@ use duties_service::{DutiesService, DutiesServiceBuilder}; use environment::RuntimeContext; use exit_future::Signal; use fork_service::{ForkService, ForkServiceBuilder}; -use futures::{ - future::{self, loop_fn, Loop}, - Future, IntoFuture, -}; use notifier::spawn_notifier; use remote_beacon_node::RemoteBeaconNode; use slog::{error, info, Logger}; use slot_clock::SlotClock; use slot_clock::SystemTimeSlotClock; -use std::time::{Duration, Instant}; use std::time::{SystemTime, UNIX_EPOCH}; -use tokio::timer::Delay; +use tokio::time::{delay_for, Duration}; use types::EthSpec; use validator_store::ValidatorStore; @@ -52,22 +47,18 @@ pub struct ProductionValidatorClient { impl ProductionValidatorClient { /// Instantiates the validator client, _without_ starting the timers to trigger block /// and attestation production. - pub fn new_from_cli( + pub async fn new_from_cli( context: RuntimeContext, - cli_args: &ArgMatches, - ) -> impl Future { - Config::from_cli(&cli_args) - .into_future() - .map_err(|e| format!("Unable to initialize config: {}", e)) - .and_then(|config| Self::new(context, config)) + cli_args: &ArgMatches<'_>, + ) -> Result { + let config = Config::from_cli(&cli_args) + .map_err(|e| format!("Unable to initialize config: {}", e))?; + Self::new(context, config).await } /// Instantiates the validator client, _without_ starting the timers to trigger block /// and attestation production. - pub fn new( - mut context: RuntimeContext, - config: Config, - ) -> impl Future { + pub async fn new(mut context: RuntimeContext, config: Config) -> Result { let log_1 = context.log.clone(); let log_2 = context.log.clone(); let log_3 = context.log.clone(); @@ -80,184 +71,153 @@ impl ProductionValidatorClient { "datadir" => format!("{:?}", config.data_dir), ); - RemoteBeaconNode::new_with_timeout(config.http_server.clone(), HTTP_TIMEOUT) - .map_err(|e| format!("Unable to init beacon node http client: {}", e)) - .into_future() - .and_then(move |beacon_node| wait_for_node(beacon_node, log_2)) - .and_then(|beacon_node| { - beacon_node - .http - .spec() - .get_eth2_config() - .map(|eth2_config| (beacon_node, eth2_config)) - .map_err(|e| format!("Unable to read eth2 config from beacon node: {:?}", e)) - }) - .and_then(|(beacon_node, eth2_config)| { - beacon_node - .http - .beacon() - .get_genesis_time() - .map(|genesis_time| (beacon_node, eth2_config, genesis_time)) - .map_err(|e| format!("Unable to read genesis time from beacon node: {:?}", e)) - }) - .and_then(move |(beacon_node, remote_eth2_config, genesis_time)| { - SystemTime::now() - .duration_since(UNIX_EPOCH) - .into_future() - .map_err(|e| format!("Unable to read system time: {:?}", e)) - .and_then::<_, Box + Send>>(move |now| { - let log = log_3.clone(); - let genesis = Duration::from_secs(genesis_time); + let beacon_node = + RemoteBeaconNode::new_with_timeout(config.http_server.clone(), HTTP_TIMEOUT) + .map_err(|e| format!("Unable to init beacon node http client: {}", e))?; - // If the time now is less than (prior to) genesis, then delay until the - // genesis instant. - // - // If the validator client starts before genesis, it will get errors from - // the slot clock. - if now < genesis { - info!( - log, - "Starting node prior to genesis"; - "seconds_to_wait" => (genesis - now).as_secs() - ); + // TODO: check if all logs in wait_for_node are produed while awaiting + let beacon_node = wait_for_node(beacon_node, log_2).await?; + let eth2_config = beacon_node + .http + .spec() + .get_eth2_config() + .await + .map_err(|e| format!("Unable to read eth2 config from beacon node: {:?}", e))?; + let genesis_time = beacon_node + .http + .beacon() + .get_genesis_time() + .await + .map_err(|e| format!("Unable to read genesis time from beacon node: {:?}", e))?; + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .map_err(|e| format!("Unable to read system time: {:?}", e))?; + let log = log_3.clone(); + let genesis = Duration::from_secs(genesis_time); - Box::new( - Delay::new(Instant::now() + (genesis - now)) - .map_err(|e| { - format!("Unable to create genesis wait delay: {:?}", e) - }) - .map(move |_| (beacon_node, remote_eth2_config, genesis_time)), - ) - } else { - info!( - log, - "Genesis has already occurred"; - "seconds_ago" => (now - genesis).as_secs() - ); + // If the time now is less than (prior to) genesis, then delay until the + // genesis instant. + // + // If the validator client starts before genesis, it will get errors from + // the slot clock. + if now < genesis { + info!( + log, + "Starting node prior to genesis"; + "seconds_to_wait" => (genesis - now).as_secs() + ); - Box::new(future::ok((beacon_node, remote_eth2_config, genesis_time))) - } - }) - }) - .and_then(|(beacon_node, eth2_config, genesis_time)| { - beacon_node - .http - .beacon() - .get_genesis_validators_root() - .map(move |genesis_validators_root| { - ( - beacon_node, - eth2_config, - genesis_time, - genesis_validators_root, - ) - }) - .map_err(|e| { - format!( - "Unable to read genesis validators root from beacon node: {:?}", - e - ) - }) - }) - .and_then( - move |(beacon_node, remote_eth2_config, genesis_time, genesis_validators_root)| { - let log = log_4.clone(); + delay_for(genesis - now).await + } else { + info!( + log, + "Genesis has already occurred"; + "seconds_ago" => (now - genesis).as_secs() + ); + } + let genesis_validators_root = beacon_node + .http + .beacon() + .get_genesis_validators_root() + .await + .map_err(|e| { + format!( + "Unable to read genesis validators root from beacon node: {:?}", + e + ) + })?; + let log = log_4.clone(); - // Do not permit a connection to a beacon node using different spec constants. - if context.eth2_config.spec_constants != remote_eth2_config.spec_constants { - return Err(format!( - "Beacon node is using an incompatible spec. Got {}, expected {}", - remote_eth2_config.spec_constants, context.eth2_config.spec_constants - )); - } + // Do not permit a connection to a beacon node using different spec constants. + if context.eth2_config.spec_constants != eth2_config.spec_constants { + return Err(format!( + "Beacon node is using an incompatible spec. Got {}, expected {}", + eth2_config.spec_constants, context.eth2_config.spec_constants + )); + } - // Note: here we just assume the spec variables of the remote node. This is very useful - // for testnets, but perhaps a security issue when it comes to mainnet. - // - // A damaging attack would be for a beacon node to convince the validator client of a - // different `SLOTS_PER_EPOCH` variable. This could result in slashable messages being - // produced. We are safe from this because `SLOTS_PER_EPOCH` is a type-level constant - // for Lighthouse. - context.eth2_config = remote_eth2_config; + // Note: here we just assume the spec variables of the remote node. This is very useful + // for testnets, but perhaps a security issue when it comes to mainnet. + // + // A damaging attack would be for a beacon node to convince the validator client of a + // different `SLOTS_PER_EPOCH` variable. This could result in slashable messages being + // produced. We are safe from this because `SLOTS_PER_EPOCH` is a type-level constant + // for Lighthouse. + context.eth2_config = eth2_config; - let slot_clock = SystemTimeSlotClock::new( - context.eth2_config.spec.genesis_slot, - Duration::from_secs(genesis_time), - Duration::from_millis(context.eth2_config.spec.milliseconds_per_slot), - ); + let slot_clock = SystemTimeSlotClock::new( + context.eth2_config.spec.genesis_slot, + Duration::from_secs(genesis_time), + Duration::from_millis(context.eth2_config.spec.milliseconds_per_slot), + ); - let fork_service = ForkServiceBuilder::new() - .slot_clock(slot_clock.clone()) - .beacon_node(beacon_node.clone()) - .runtime_context(context.service_context("fork".into())) - .build()?; + let fork_service = ForkServiceBuilder::new() + .slot_clock(slot_clock.clone()) + .beacon_node(beacon_node.clone()) + .runtime_context(context.service_context("fork".into())) + .build()?; - let validator_store: ValidatorStore = - match &config.key_source { - // Load pre-existing validators from the data dir. - // - // Use the `account_manager` to generate these files. - KeySource::Disk => ValidatorStore::load_from_disk( - config.data_dir.clone(), - genesis_validators_root, - context.eth2_config.spec.clone(), - fork_service.clone(), - log.clone(), - )?, - // Generate ephemeral insecure keypairs for testing purposes. - // - // Do not use in production. - KeySource::InsecureKeypairs(indices) => { - ValidatorStore::insecure_ephemeral_validators( - &indices, - genesis_validators_root, - context.eth2_config.spec.clone(), - fork_service.clone(), - log.clone(), - )? - } - }; + let validator_store: ValidatorStore = match &config.key_source { + // Load pre-existing validators from the data dir. + // + // Use the `account_manager` to generate these files. + KeySource::Disk => ValidatorStore::load_from_disk( + config.data_dir.clone(), + genesis_validators_root, + context.eth2_config.spec.clone(), + fork_service.clone(), + log.clone(), + )?, + // Generate ephemeral insecure keypairs for testing purposes. + // + // Do not use in production. + KeySource::InsecureKeypairs(indices) => ValidatorStore::insecure_ephemeral_validators( + &indices, + genesis_validators_root, + context.eth2_config.spec.clone(), + fork_service.clone(), + log.clone(), + )?, + }; - info!( - log, - "Loaded validator keypair store"; - "voting_validators" => validator_store.num_voting_validators() - ); + info!( + log, + "Loaded validator keypair store"; + "voting_validators" => validator_store.num_voting_validators() + ); - let duties_service = DutiesServiceBuilder::new() - .slot_clock(slot_clock.clone()) - .validator_store(validator_store.clone()) - .beacon_node(beacon_node.clone()) - .runtime_context(context.service_context("duties".into())) - .allow_unsynced_beacon_node(config.allow_unsynced_beacon_node) - .build()?; + let duties_service = DutiesServiceBuilder::new() + .slot_clock(slot_clock.clone()) + .validator_store(validator_store.clone()) + .beacon_node(beacon_node.clone()) + .runtime_context(context.service_context("duties".into())) + .allow_unsynced_beacon_node(config.allow_unsynced_beacon_node) + .build()?; - let block_service = BlockServiceBuilder::new() - .duties_service(duties_service.clone()) - .slot_clock(slot_clock.clone()) - .validator_store(validator_store.clone()) - .beacon_node(beacon_node.clone()) - .runtime_context(context.service_context("block".into())) - .build()?; + let block_service = BlockServiceBuilder::new() + .duties_service(duties_service.clone()) + .slot_clock(slot_clock.clone()) + .validator_store(validator_store.clone()) + .beacon_node(beacon_node.clone()) + .runtime_context(context.service_context("block".into())) + .build()?; - let attestation_service = AttestationServiceBuilder::new() - .duties_service(duties_service.clone()) - .slot_clock(slot_clock) - .validator_store(validator_store) - .beacon_node(beacon_node) - .runtime_context(context.service_context("attestation".into())) - .build()?; + let attestation_service = AttestationServiceBuilder::new() + .duties_service(duties_service.clone()) + .slot_clock(slot_clock) + .validator_store(validator_store) + .beacon_node(beacon_node) + .runtime_context(context.service_context("attestation".into())) + .build()?; - Ok(Self { - context, - duties_service, - fork_service, - block_service, - attestation_service, - exit_signals: vec![], - }) - }, - ) + Ok(Self { + context, + duties_service, + fork_service, + block_service, + attestation_service, + exit_signals: vec![], + }) } pub fn start_service(&mut self) -> Result<(), String> { @@ -298,48 +258,39 @@ impl ProductionValidatorClient { /// Request the version from the node, looping back and trying again on failure. Exit once the node /// has been contacted. -fn wait_for_node( +async fn wait_for_node( beacon_node: RemoteBeaconNode, log: Logger, -) -> impl Future, Error = String> { +) -> Result, String> { // Try to get the version string from the node, looping until success is returned. - loop_fn(beacon_node.clone(), move |beacon_node| { + loop { let log = log.clone(); - beacon_node + let result = beacon_node .clone() .http .node() .get_version() - .map_err(|e| format!("{:?}", e)) - .then(move |result| { - let future: Box, Error = String> + Send> = match result - { - Ok(version) => { - info!( - log, - "Connected to beacon node"; - "version" => version, - ); + .await + .map_err(|e| format!("{:?}", e)); - Box::new(future::ok(Loop::Break(beacon_node))) - } - Err(e) => { - error!( - log, - "Unable to connect to beacon node"; - "error" => format!("{:?}", e), - ); + match result { + Ok(version) => { + info!( + log, + "Connected to beacon node"; + "version" => version, + ); - Box::new( - Delay::new(Instant::now() + RETRY_DELAY) - .map_err(|e| format!("Failed to trigger delay: {:?}", e)) - .and_then(|_| future::ok(Loop::Continue(beacon_node))), - ) - } - }; - - future - }) - }) - .map(|_| beacon_node) + return Ok(beacon_node); + } + Err(e) => { + error!( + log, + "Unable to connect to beacon node"; + "error" => format!("{:?}", e), + ); + delay_for(RETRY_DELAY).await; + } + } + } } diff --git a/validator_client/src/notifier.rs b/validator_client/src/notifier.rs index e60bf8cc62..9b611a4c30 100644 --- a/validator_client/src/notifier.rs +++ b/validator_client/src/notifier.rs @@ -1,10 +1,9 @@ use crate::ProductionValidatorClient; use exit_future::Signal; -use futures::{Future, Stream}; +use futures::{FutureExt, StreamExt}; use slog::{error, info}; use slot_clock::SlotClock; -use std::time::{Duration, Instant}; -use tokio::timer::Interval; +use tokio::time::{interval_at, Duration, Instant}; use types::EthSpec; /// Spawns a notifier service which periodically logs information about the node. @@ -26,66 +25,63 @@ pub fn spawn_notifier(client: &ProductionValidatorClient) -> Resu let duties_service = client.duties_service.clone(); let log_1 = context.log.clone(); - let log_2 = context.log.clone(); - let interval_future = Interval::new(start_instant, interval_duration) - .map_err( - move |e| error!(log_1, "Slot notifier timer failed"; "error" => format!("{:?}", e)), - ) - .for_each(move |_| { - let log = log_2.clone(); + // Note: interval_at panics if `interval_duration` is 0 + let interval_fut = interval_at(start_instant, interval_duration).for_each(move |_| { + let log = log_1.clone(); - if let Some(slot) = duties_service.slot_clock.now() { - let epoch = slot.epoch(T::slots_per_epoch()); + if let Some(slot) = duties_service.slot_clock.now() { + let epoch = slot.epoch(T::slots_per_epoch()); - let total_validators = duties_service.total_validator_count(); - let proposing_validators = duties_service.proposer_count(epoch); - let attesting_validators = duties_service.attester_count(epoch); + let total_validators = duties_service.total_validator_count(); + let proposing_validators = duties_service.proposer_count(epoch); + let attesting_validators = duties_service.attester_count(epoch); - if total_validators == 0 { - error!(log, "No validators present") - } else if total_validators == attesting_validators { - info!( - log_2, - "All validators active"; - "proposers" => proposing_validators, - "active_validators" => attesting_validators, - "total_validators" => total_validators, - "epoch" => format!("{}", epoch), - "slot" => format!("{}", slot), - ); - } else if attesting_validators > 0 { - info!( - log_2, - "Some validators active"; - "proposers" => proposing_validators, - "active_validators" => attesting_validators, - "total_validators" => total_validators, - "epoch" => format!("{}", epoch), - "slot" => format!("{}", slot), - ); - } else { - info!( - log_2, - "Awaiting activation"; - "validators" => total_validators, - "epoch" => format!("{}", epoch), - "slot" => format!("{}", slot), - ); - } + if total_validators == 0 { + error!(log, "No validators present") + } else if total_validators == attesting_validators { + info!( + log_1, + "All validators active"; + "proposers" => proposing_validators, + "active_validators" => attesting_validators, + "total_validators" => total_validators, + "epoch" => format!("{}", epoch), + "slot" => format!("{}", slot), + ); + } else if attesting_validators > 0 { + info!( + log_1, + "Some validators active"; + "proposers" => proposing_validators, + "active_validators" => attesting_validators, + "total_validators" => total_validators, + "epoch" => format!("{}", epoch), + "slot" => format!("{}", slot), + ); } else { - error!(log, "Unable to read slot clock"); + info!( + log_1, + "Awaiting activation"; + "validators" => total_validators, + "epoch" => format!("{}", epoch), + "slot" => format!("{}", slot), + ); } + } else { + error!(log, "Unable to read slot clock"); + } - Ok(()) - }); + futures::future::ready(()) + }); let (exit_signal, exit) = exit_future::signal(); let log = context.log.clone(); - client.context.executor.spawn( - exit.until(interval_future) - .map(move |_| info!(log, "Shutdown complete")), + let future = futures::future::select( + interval_fut, + exit.map(move |_| info!(log, "Shutdown complete")), ); + tokio::task::spawn(future); Ok(exit_signal) } diff --git a/validator_client/src/validator_directory.rs b/validator_client/src/validator_directory.rs index 197e1cb44e..35bf580d89 100644 --- a/validator_client/src/validator_directory.rs +++ b/validator_client/src/validator_directory.rs @@ -1,6 +1,6 @@ use bls::get_withdrawal_credentials; use deposit_contract::{encode_eth1_tx_data, DEPOSIT_GAS}; -use futures::{Future, IntoFuture}; +use futures::compat::Future01CompatExt; use hex; use ssz::{Decode, Encode}; use ssz_derive::{Decode, Encode}; @@ -303,29 +303,27 @@ impl ValidatorDirectoryBuilder { Ok(self) } - pub fn submit_eth1_deposit( - self, + pub async fn submit_eth1_deposit( + &self, web3: Web3, from: Address, deposit_contract: Address, - ) -> impl Future { - self.get_deposit_data() - .into_future() - .and_then(move |(deposit_data, deposit_amount)| { - web3.eth() - .send_transaction(TransactionRequest { - from, - to: Some(deposit_contract), - gas: Some(DEPOSIT_GAS.into()), - gas_price: None, - value: Some(from_gwei(deposit_amount)), - data: Some(deposit_data.into()), - nonce: None, - condition: None, - }) - .map_err(|e| format!("Failed to send transaction: {:?}", e)) + ) -> Result { + let (deposit_data, deposit_amount) = self.get_deposit_data()?; + web3.eth() + .send_transaction(TransactionRequest { + from, + to: Some(deposit_contract), + gas: Some(DEPOSIT_GAS.into()), + gas_price: None, + value: Some(from_gwei(deposit_amount)), + data: Some(deposit_data.into()), + nonce: None, + condition: None, }) - .map(|tx| (self, tx)) + .compat() + .await + .map_err(|e| format!("Failed to send transaction: {:?}", e)) } pub fn build(self) -> Result {