mirror of
https://github.com/sigp/lighthouse.git
synced 2026-07-01 11:54:40 +00:00
Update notifier to stable futures
This commit is contained in:
@@ -366,12 +366,16 @@ where
|
|||||||
.ok_or_else(|| "slot_notifier requires a chain spec".to_string())?
|
.ok_or_else(|| "slot_notifier requires a chain spec".to_string())?
|
||||||
.milliseconds_per_slot;
|
.milliseconds_per_slot;
|
||||||
|
|
||||||
let exit_channel = spawn_notifier(
|
let exit_channel = context
|
||||||
context,
|
.runtime_handle
|
||||||
|
.enter(|| {
|
||||||
|
spawn_notifier(
|
||||||
beacon_chain,
|
beacon_chain,
|
||||||
network_globals,
|
network_globals,
|
||||||
milliseconds_per_slot,
|
milliseconds_per_slot,
|
||||||
|
context.log.clone(),
|
||||||
)
|
)
|
||||||
|
})
|
||||||
.map_err(|e| format!("Unable to start slot notifier: {}", e))?;
|
.map_err(|e| format!("Unable to start slot notifier: {}", e))?;
|
||||||
|
|
||||||
self.exit_channels.push(exit_channel);
|
self.exit_channels.push(exit_channel);
|
||||||
|
|||||||
@@ -1,14 +1,12 @@
|
|||||||
use crate::metrics;
|
use crate::metrics;
|
||||||
use beacon_chain::{BeaconChain, BeaconChainTypes};
|
use beacon_chain::{BeaconChain, BeaconChainTypes};
|
||||||
use environment::RuntimeContext;
|
|
||||||
use eth2_libp2p::NetworkGlobals;
|
use eth2_libp2p::NetworkGlobals;
|
||||||
use futures::{Future, Stream};
|
use futures::prelude::*;
|
||||||
use parking_lot::Mutex;
|
use parking_lot::Mutex;
|
||||||
use slog::{debug, error, info, warn};
|
use slog::{debug, error, info, warn};
|
||||||
use slot_clock::SlotClock;
|
use slot_clock::SlotClock;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::time::{Duration, Instant};
|
use std::time::{Duration, Instant};
|
||||||
use tokio::timer::Interval;
|
|
||||||
use types::{EthSpec, Slot};
|
use types::{EthSpec, Slot};
|
||||||
|
|
||||||
/// Create a warning log whenever the peer count is at or below this value.
|
/// Create a warning log whenever the peer count is at or below this value.
|
||||||
@@ -27,15 +25,11 @@ const SPEEDO_OBSERVATIONS: usize = 4;
|
|||||||
|
|
||||||
/// Spawns a notifier service which periodically logs information about the node.
|
/// Spawns a notifier service which periodically logs information about the node.
|
||||||
pub fn spawn_notifier<T: BeaconChainTypes>(
|
pub fn spawn_notifier<T: BeaconChainTypes>(
|
||||||
context: RuntimeContext<T::EthSpec>,
|
|
||||||
beacon_chain: Arc<BeaconChain<T>>,
|
beacon_chain: Arc<BeaconChain<T>>,
|
||||||
network: Arc<NetworkGlobals<T::EthSpec>>,
|
network: Arc<NetworkGlobals<T::EthSpec>>,
|
||||||
milliseconds_per_slot: u64,
|
milliseconds_per_slot: u64,
|
||||||
|
log: slog::Logger,
|
||||||
) -> Result<tokio::sync::oneshot::Sender<()>, String> {
|
) -> Result<tokio::sync::oneshot::Sender<()>, String> {
|
||||||
let log_1 = context.log.clone();
|
|
||||||
let log_2 = context.log.clone();
|
|
||||||
let log_3 = context.log.clone();
|
|
||||||
|
|
||||||
let slot_duration = Duration::from_millis(milliseconds_per_slot);
|
let slot_duration = Duration::from_millis(milliseconds_per_slot);
|
||||||
let duration_to_next_slot = beacon_chain
|
let duration_to_next_slot = beacon_chain
|
||||||
.slot_clock
|
.slot_clock
|
||||||
@@ -43,29 +37,26 @@ pub fn spawn_notifier<T: BeaconChainTypes>(
|
|||||||
.ok_or_else(|| "slot_notifier unable to determine time to next slot")?;
|
.ok_or_else(|| "slot_notifier unable to determine time to next slot")?;
|
||||||
|
|
||||||
// Run this half way through each slot.
|
// Run this half way through each slot.
|
||||||
let start_instant = Instant::now() + duration_to_next_slot + (slot_duration / 2);
|
let start_instant = tokio::time::Instant::now() + duration_to_next_slot + (slot_duration / 2);
|
||||||
|
|
||||||
// Run this each slot.
|
// Run this each slot.
|
||||||
let interval_duration = slot_duration;
|
let interval_duration = slot_duration;
|
||||||
|
|
||||||
let speedo = Mutex::new(Speedo::default());
|
let speedo = Mutex::new(Speedo::default());
|
||||||
|
let interval = tokio::time::interval_at(start_instant, interval_duration);
|
||||||
|
|
||||||
let interval_future = Interval::new(start_instant, interval_duration)
|
let interval_future = async move {
|
||||||
.map_err(
|
while let Some(_) = interval.next().await {
|
||||||
move |e| error!(log_1, "Slot notifier timer failed"; "error" => format!("{:?}", e)),
|
|
||||||
)
|
|
||||||
.for_each(move |_| {
|
|
||||||
let log = log_2.clone();
|
|
||||||
|
|
||||||
let connected_peer_count = network.connected_peers();
|
let connected_peer_count = network.connected_peers();
|
||||||
let sync_state = network.sync_state();
|
let sync_state = network.sync_state();
|
||||||
|
|
||||||
let head_info = beacon_chain.head_info()
|
let head_info = beacon_chain.head_info().map_err(|e| {
|
||||||
.map_err(|e| error!(
|
error!(
|
||||||
log,
|
log,
|
||||||
"Failed to get beacon chain head info";
|
"Failed to get beacon chain head info";
|
||||||
"error" => format!("{:?}", e)
|
"error" => format!("{:?}", e)
|
||||||
))?;
|
)
|
||||||
|
})?;
|
||||||
|
|
||||||
let head_slot = head_info.slot;
|
let head_slot = head_info.slot;
|
||||||
let current_slot = beacon_chain.slot().map_err(|e| {
|
let current_slot = beacon_chain.slot().map_err(|e| {
|
||||||
@@ -83,7 +74,10 @@ pub fn spawn_notifier<T: BeaconChainTypes>(
|
|||||||
let mut speedo = speedo.lock();
|
let mut speedo = speedo.lock();
|
||||||
speedo.observe(head_slot, Instant::now());
|
speedo.observe(head_slot, Instant::now());
|
||||||
|
|
||||||
metrics::set_gauge(&metrics::SYNC_SLOTS_PER_SECOND, speedo.slots_per_second().unwrap_or_else(|| 0_f64) as i64);
|
metrics::set_gauge(
|
||||||
|
&metrics::SYNC_SLOTS_PER_SECOND,
|
||||||
|
speedo.slots_per_second().unwrap_or_else(|| 0_f64) as i64,
|
||||||
|
);
|
||||||
|
|
||||||
// The next two lines take advantage of saturating subtraction on `Slot`.
|
// The next two lines take advantage of saturating subtraction on `Slot`.
|
||||||
let head_distance = current_slot - head_slot;
|
let head_distance = current_slot - head_slot;
|
||||||
@@ -104,7 +98,6 @@ pub fn spawn_notifier<T: BeaconChainTypes>(
|
|||||||
"sync_state" =>format!("{}", sync_state)
|
"sync_state" =>format!("{}", sync_state)
|
||||||
);
|
);
|
||||||
|
|
||||||
|
|
||||||
// Log if we are syncing
|
// Log if we are syncing
|
||||||
if sync_state.is_syncing() {
|
if sync_state.is_syncing() {
|
||||||
let distance = format!(
|
let distance = format!(
|
||||||
@@ -122,9 +115,13 @@ pub fn spawn_notifier<T: BeaconChainTypes>(
|
|||||||
);
|
);
|
||||||
} else {
|
} else {
|
||||||
if sync_state.is_synced() {
|
if sync_state.is_synced() {
|
||||||
let block_info = if current_slot > head_slot { format!(" … empty") } else { format!("{}", head_root) };
|
let block_info = if current_slot > head_slot {
|
||||||
|
format!(" … empty")
|
||||||
|
} else {
|
||||||
|
format!("{}", head_root)
|
||||||
|
};
|
||||||
info!(
|
info!(
|
||||||
log_2,
|
log,
|
||||||
"Synced";
|
"Synced";
|
||||||
"peers" => peer_count_pretty(connected_peer_count),
|
"peers" => peer_count_pretty(connected_peer_count),
|
||||||
"finalized_root" => format!("{}", finalized_root),
|
"finalized_root" => format!("{}", finalized_root),
|
||||||
@@ -135,7 +132,7 @@ pub fn spawn_notifier<T: BeaconChainTypes>(
|
|||||||
);
|
);
|
||||||
} else {
|
} else {
|
||||||
info!(
|
info!(
|
||||||
log_2,
|
log,
|
||||||
"Searching for peers";
|
"Searching for peers";
|
||||||
"peers" => peer_count_pretty(connected_peer_count),
|
"peers" => peer_count_pretty(connected_peer_count),
|
||||||
"finalized_root" => format!("{}", finalized_root),
|
"finalized_root" => format!("{}", finalized_root),
|
||||||
@@ -145,25 +142,14 @@ pub fn spawn_notifier<T: BeaconChainTypes>(
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Ok(())
|
}
|
||||||
})
|
Ok::<(), ()>(())
|
||||||
.then(move |result| {
|
};
|
||||||
match result {
|
|
||||||
Ok(()) => Ok(()),
|
|
||||||
Err(e) => {
|
|
||||||
error!(
|
|
||||||
log_3,
|
|
||||||
"Notifier failed to notify";
|
|
||||||
"error" => format!("{:?}", e)
|
|
||||||
);
|
|
||||||
Ok(())
|
|
||||||
} } });
|
|
||||||
|
|
||||||
let (exit_signal, exit) = tokio::sync::oneshot::channel();
|
let (exit_signal, exit) = tokio::sync::oneshot::channel();
|
||||||
|
|
||||||
context
|
// run the notifier on the current executor
|
||||||
.executor
|
tokio::spawn(futures::future::select(Box::pin(interval_future), exit));
|
||||||
.spawn(interval_future.select(exit).map(|_| ()).map_err(|_| ()));
|
|
||||||
|
|
||||||
Ok(exit_signal)
|
Ok(exit_signal)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -3,7 +3,6 @@ use eth2_libp2p::{
|
|||||||
discovery::{build_enr, CombinedKey, CombinedKeyExt, Keypair, ENR_FILENAME},
|
discovery::{build_enr, CombinedKey, CombinedKeyExt, Keypair, ENR_FILENAME},
|
||||||
NetworkConfig, NETWORK_KEY_FILENAME,
|
NetworkConfig, NETWORK_KEY_FILENAME,
|
||||||
};
|
};
|
||||||
use std::convert::TryInto;
|
|
||||||
use std::fs;
|
use std::fs;
|
||||||
use std::fs::File;
|
use std::fs::File;
|
||||||
use std::io::Write;
|
use std::io::Write;
|
||||||
|
|||||||
Reference in New Issue
Block a user