diff --git a/beacon_node/client/src/notifier.rs b/beacon_node/client/src/notifier.rs index 4f57800b14..0bd3ae87f3 100644 --- a/beacon_node/client/src/notifier.rs +++ b/beacon_node/client/src/notifier.rs @@ -2,13 +2,13 @@ use crate::metrics; use beacon_chain::{BeaconChain, BeaconChainTypes}; use environment::RuntimeContext; use eth2_libp2p::NetworkGlobals; -use futures::{Future, Stream}; +use futures::{FutureExt, StreamExt, TryFutureExt}; use parking_lot::Mutex; use slog::{debug, error, info, warn}; use slot_clock::SlotClock; use std::sync::Arc; -use std::time::{Duration, Instant}; -use tokio::timer::Interval; +use std::time::Duration; +use tokio::time::{interval_at, Instant}; use types::{EthSpec, Slot}; /// Create a warning log whenever the peer count is at or below this value. @@ -32,9 +32,7 @@ pub fn spawn_notifier( network: Arc>, milliseconds_per_slot: u64, ) -> Result, String> { - let log_1 = context.log.clone(); - let log_2 = context.log.clone(); - let log_3 = context.log.clone(); + let log = context.log.clone(); let slot_duration = Duration::from_millis(milliseconds_per_slot); let duration_to_next_slot = beacon_chain @@ -50,82 +48,93 @@ pub fn spawn_notifier( let speedo = Mutex::new(Speedo::default()); - let interval_future = Interval::new(start_instant, interval_duration) - .map_err( - move |e| error!(log_1, "Slot notifier timer failed"; "error" => format!("{:?}", e)), - ) - .for_each(move |_| { - let log = log_2.clone(); + // Note: `interval_at` panics when interval_duration is 0 + // TODO: `Return type of closure passed to `for_each` is restricted to `Future` + // Hence, shifting the .then() error logs into the `for_each` closure. + // Can be solved with `TryStreamExt::try_for_each` if `Interval` implemented `TryStream`. + let interval_future = interval_at(start_instant, interval_duration).for_each(|_| { + let connected_peer_count = network.connected_peers(); - let connected_peer_count = network.connected_peers(); - - let head_info = beacon_chain.head_info() - .map_err(|e| error!( - log, - "Failed to get beacon chain head info"; - "error" => format!("{:?}", e) - ))?; - - let head_slot = head_info.slot; - let head_epoch = head_slot.epoch(T::EthSpec::slots_per_epoch()); - let current_slot = beacon_chain.slot().map_err(|e| { + let head_info = match beacon_chain.head_info() { + Ok(head) => head, + Err(e) => { error!( log, - "Unable to read current slot"; + "Notifier failed to notify, Failed to get beacon chain head info"; "error" => format!("{:?}", e) - ) - })?; - let current_epoch = current_slot.epoch(T::EthSpec::slots_per_epoch()); - let finalized_epoch = head_info.finalized_checkpoint.epoch; - let finalized_root = head_info.finalized_checkpoint.root; - let head_root = head_info.block_root; - - let mut speedo = speedo.lock(); - speedo.observe(head_slot, Instant::now()); - - metrics::set_gauge(&metrics::SYNC_SLOTS_PER_SECOND, speedo.slots_per_second().unwrap_or_else(|| 0_f64) as i64); - - // The next two lines take advantage of saturating subtraction on `Slot`. - let head_distance = current_slot - head_slot; - - if connected_peer_count <= WARN_PEER_COUNT { - warn!(log, "Low peer count"; "peer_count" => peer_count_pretty(connected_peer_count)); + ); + return futures::future::ready(()); } + }; - debug!( - log, - "Slot timer"; - "peers" => peer_count_pretty(connected_peer_count), - "finalized_root" => format!("{}", finalized_root), - "finalized_epoch" => finalized_epoch, - "head_block" => format!("{}", head_root), - "head_slot" => head_slot, - "current_slot" => current_slot, + let head_slot = head_info.slot; + let head_epoch = head_slot.epoch(T::EthSpec::slots_per_epoch()); + let current_slot = match beacon_chain.slot() { + Ok(slot) => slot, + Err(e) => { + error!( + log, + "Notify failed to notify, Unable to read current slot"; + "error" => format!("{:?}", e) + ); + return futures::future::ready(()); + } + }; + + let current_epoch = current_slot.epoch(T::EthSpec::slots_per_epoch()); + let finalized_epoch = head_info.finalized_checkpoint.epoch; + let finalized_root = head_info.finalized_checkpoint.root; + let head_root = head_info.block_root; + + let mut speedo = speedo.lock(); + speedo.observe(head_slot, Instant::now()); + + metrics::set_gauge( + &metrics::SYNC_SLOTS_PER_SECOND, + speedo.slots_per_second().unwrap_or_else(|| 0_f64) as i64, + ); + + // The next two lines take advantage of saturating subtraction on `Slot`. + let head_distance = current_slot - head_slot; + + if connected_peer_count <= WARN_PEER_COUNT { + warn!(log, "Low peer count"; "peer_count" => peer_count_pretty(connected_peer_count)); + } + + debug!( + log, + "Slot timer"; + "peers" => peer_count_pretty(connected_peer_count), + "finalized_root" => format!("{}", finalized_root), + "finalized_epoch" => finalized_epoch, + "head_block" => format!("{}", head_root), + "head_slot" => head_slot, + "current_slot" => current_slot, + ); + + if head_epoch + 1 < current_epoch { + let distance = format!( + "{} slots ({})", + head_distance.as_u64(), + slot_distance_pretty(head_distance, slot_duration) ); - if head_epoch + 1 < current_epoch { - let distance = format!( - "{} slots ({})", - head_distance.as_u64(), - slot_distance_pretty(head_distance, slot_duration) - ); + info!( + log, + "Syncing"; + "peers" => peer_count_pretty(connected_peer_count), + "distance" => distance, + "speed" => sync_speed_pretty(speedo.slots_per_second()), + "est_time" => estimated_time_pretty(speedo.estimated_time_till_slot(current_slot)), + ); - info!( - log, - "Syncing"; - "peers" => peer_count_pretty(connected_peer_count), - "distance" => distance, - "speed" => sync_speed_pretty(speedo.slots_per_second()), - "est_time" => estimated_time_pretty(speedo.estimated_time_till_slot(current_slot)), - ); + return futures::future::ready(()); + }; - return Ok(()); - }; - - macro_rules! not_quite_synced_log { + macro_rules! not_quite_synced_log { ($message: expr) => { info!( - log_2, + log, $message; "peers" => peer_count_pretty(connected_peer_count), "finalized_root" => format!("{}", finalized_root), @@ -136,41 +145,31 @@ pub fn spawn_notifier( } } - if head_epoch + 1 == current_epoch { - not_quite_synced_log!("Synced to previous epoch") - } else if head_slot != current_slot { - not_quite_synced_log!("Synced to current epoch") - } else { - info!( - log_2, - "Synced"; - "peers" => peer_count_pretty(connected_peer_count), - "finalized_root" => format!("{}", finalized_root), - "finalized_epoch" => finalized_epoch, - "epoch" => current_epoch, - "slot" => current_slot, - ); - }; + if head_epoch + 1 == current_epoch { + not_quite_synced_log!("Synced to previous epoch") + } else if head_slot != current_slot { + not_quite_synced_log!("Synced to current epoch") + } else { + info!( + log, + "Synced"; + "peers" => peer_count_pretty(connected_peer_count), + "finalized_root" => format!("{}", finalized_root), + "finalized_epoch" => finalized_epoch, + "epoch" => current_epoch, + "slot" => current_slot, + ); + }; - Ok(()) - }) - .then(move |result| { - match result { - Ok(()) => Ok(()), - Err(e) => { - error!( - log_3, - "Notifier failed to notify"; - "error" => format!("{:?}", e) - ); - Ok(()) - } } }); + futures::future::ready(()) + }); let (exit_signal, exit) = tokio::sync::oneshot::channel(); - context - .executor - .spawn(interval_future.select(exit).map(|_| ()).map_err(|_| ())); + let future = futures::future::select(interval_future, exit.map_err(|_| ()).map(|_| ())); + + // TODO: the runtime handle should spawn this future. + tokio::task::spawn(future); Ok(exit_signal) }