diff --git a/beacon_node/client/src/builder.rs b/beacon_node/client/src/builder.rs index ace8a91012..b6354cf870 100644 --- a/beacon_node/client/src/builder.rs +++ b/beacon_node/client/src/builder.rs @@ -366,13 +366,17 @@ where .ok_or_else(|| "slot_notifier requires a chain spec".to_string())? .milliseconds_per_slot; - let exit_channel = spawn_notifier( - context, - beacon_chain, - network_globals, - milliseconds_per_slot, - ) - .map_err(|e| format!("Unable to start slot notifier: {}", e))?; + let exit_channel = context + .runtime_handle + .enter(|| { + spawn_notifier( + beacon_chain, + network_globals, + milliseconds_per_slot, + context.log.clone(), + ) + }) + .map_err(|e| format!("Unable to start slot notifier: {}", e))?; self.exit_channels.push(exit_channel); diff --git a/beacon_node/client/src/notifier.rs b/beacon_node/client/src/notifier.rs index 1bb649b99e..add49b96d9 100644 --- a/beacon_node/client/src/notifier.rs +++ b/beacon_node/client/src/notifier.rs @@ -1,14 +1,12 @@ use crate::metrics; use beacon_chain::{BeaconChain, BeaconChainTypes}; -use environment::RuntimeContext; use eth2_libp2p::NetworkGlobals; -use futures::{Future, Stream}; +use futures::prelude::*; use parking_lot::Mutex; use slog::{debug, error, info, warn}; use slot_clock::SlotClock; use std::sync::Arc; use std::time::{Duration, Instant}; -use tokio::timer::Interval; use types::{EthSpec, Slot}; /// Create a warning log whenever the peer count is at or below this value. @@ -27,15 +25,11 @@ const SPEEDO_OBSERVATIONS: usize = 4; /// Spawns a notifier service which periodically logs information about the node. pub fn spawn_notifier( - context: RuntimeContext, beacon_chain: Arc>, network: Arc>, milliseconds_per_slot: u64, + log: slog::Logger, ) -> Result, String> { - let log_1 = context.log.clone(); - let log_2 = context.log.clone(); - let log_3 = context.log.clone(); - let slot_duration = Duration::from_millis(milliseconds_per_slot); let duration_to_next_slot = beacon_chain .slot_clock @@ -43,29 +37,26 @@ pub fn spawn_notifier( .ok_or_else(|| "slot_notifier unable to determine time to next slot")?; // Run this half way through each slot. - let start_instant = Instant::now() + duration_to_next_slot + (slot_duration / 2); + let start_instant = tokio::time::Instant::now() + duration_to_next_slot + (slot_duration / 2); // Run this each slot. let interval_duration = slot_duration; let speedo = Mutex::new(Speedo::default()); + let interval = tokio::time::interval_at(start_instant, interval_duration); - let interval_future = Interval::new(start_instant, interval_duration) - .map_err( - move |e| error!(log_1, "Slot notifier timer failed"; "error" => format!("{:?}", e)), - ) - .for_each(move |_| { - let log = log_2.clone(); - + let interval_future = async move { + while let Some(_) = interval.next().await { let connected_peer_count = network.connected_peers(); let sync_state = network.sync_state(); - let head_info = beacon_chain.head_info() - .map_err(|e| error!( + let head_info = beacon_chain.head_info().map_err(|e| { + error!( log, "Failed to get beacon chain head info"; "error" => format!("{:?}", e) - ))?; + ) + })?; let head_slot = head_info.slot; let current_slot = beacon_chain.slot().map_err(|e| { @@ -83,7 +74,10 @@ pub fn spawn_notifier( let mut speedo = speedo.lock(); speedo.observe(head_slot, Instant::now()); - metrics::set_gauge(&metrics::SYNC_SLOTS_PER_SECOND, speedo.slots_per_second().unwrap_or_else(|| 0_f64) as i64); + metrics::set_gauge( + &metrics::SYNC_SLOTS_PER_SECOND, + speedo.slots_per_second().unwrap_or_else(|| 0_f64) as i64, + ); // The next two lines take advantage of saturating subtraction on `Slot`. let head_distance = current_slot - head_slot; @@ -101,10 +95,9 @@ pub fn spawn_notifier( "head_block" => format!("{}", head_root), "head_slot" => head_slot, "current_slot" => current_slot, - "sync_state" =>format!("{}", sync_state) + "sync_state" =>format!("{}", sync_state) ); - // Log if we are syncing if sync_state.is_syncing() { let distance = format!( @@ -122,9 +115,13 @@ pub fn spawn_notifier( ); } else { if sync_state.is_synced() { - let block_info = if current_slot > head_slot { format!(" … empty") } else { format!("{}", head_root) }; + let block_info = if current_slot > head_slot { + format!(" … empty") + } else { + format!("{}", head_root) + }; info!( - log_2, + log, "Synced"; "peers" => peer_count_pretty(connected_peer_count), "finalized_root" => format!("{}", finalized_root), @@ -135,7 +132,7 @@ pub fn spawn_notifier( ); } else { info!( - log_2, + log, "Searching for peers"; "peers" => peer_count_pretty(connected_peer_count), "finalized_root" => format!("{}", finalized_root), @@ -145,25 +142,14 @@ pub fn spawn_notifier( ); } } - Ok(()) - }) - .then(move |result| { - match result { - Ok(()) => Ok(()), - Err(e) => { - error!( - log_3, - "Notifier failed to notify"; - "error" => format!("{:?}", e) - ); - Ok(()) - } } }); + } + Ok::<(), ()>(()) + }; let (exit_signal, exit) = tokio::sync::oneshot::channel(); - context - .executor - .spawn(interval_future.select(exit).map(|_| ()).map_err(|_| ())); + // run the notifier on the current executor + tokio::spawn(futures::future::select(Box::pin(interval_future), exit)); Ok(exit_signal) } diff --git a/lcli/src/generate_bootnode_enr.rs b/lcli/src/generate_bootnode_enr.rs index 27f5dda278..3fc259e634 100644 --- a/lcli/src/generate_bootnode_enr.rs +++ b/lcli/src/generate_bootnode_enr.rs @@ -3,7 +3,6 @@ use eth2_libp2p::{ discovery::{build_enr, CombinedKey, CombinedKeyExt, Keypair, ENR_FILENAME}, NetworkConfig, NETWORK_KEY_FILENAME, }; -use std::convert::TryInto; use std::fs; use std::fs::File; use std::io::Write;