Fix timer

This commit is contained in:
pawan
2020-04-21 01:56:03 +05:30
parent 1475302c44
commit 1fa6b49136

View File

@@ -3,19 +3,21 @@
//! This service allows task execution on the beacon node for various functionality. //! This service allows task execution on the beacon node for various functionality.
use beacon_chain::{BeaconChain, BeaconChainTypes}; use beacon_chain::{BeaconChain, BeaconChainTypes};
use futures::stream::{StreamExt}; use futures::stream::StreamExt;
use futures::{future, prelude::*};
use slot_clock::SlotClock; use slot_clock::SlotClock;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use tokio::sync::oneshot::error::TryRecvError; use tokio::runtime::Handle;
use tokio::time::{interval_at, Instant}; use tokio::time::{interval_at, Instant};
/// Spawns a timer service which periodically executes tasks for the beacon chain /// Spawns a timer service which periodically executes tasks for the beacon chain
pub async fn spawn<T: BeaconChainTypes>( pub async fn spawn<T: BeaconChainTypes>(
handle: &Handle,
beacon_chain: Arc<BeaconChain<T>>, beacon_chain: Arc<BeaconChain<T>>,
milliseconds_per_slot: u64, milliseconds_per_slot: u64,
) -> Result<tokio::sync::oneshot::Sender<()>, &'static str> { ) -> Result<tokio::sync::oneshot::Sender<()>, &'static str> {
let (exit_signal, mut exit) = tokio::sync::oneshot::channel(); let (exit_signal, exit) = tokio::sync::oneshot::channel();
let start_instant = Instant::now() let start_instant = Instant::now()
+ beacon_chain + beacon_chain
@@ -23,19 +25,15 @@ pub async fn spawn<T: BeaconChainTypes>(
.duration_to_next_slot() .duration_to_next_slot()
.ok_or_else(|| "slot_notifier unable to determine time to next slot")?; .ok_or_else(|| "slot_notifier unable to determine time to next slot")?;
// Warning: `interval_at` panics on error // Warning: `interval_at` panics if `milliseconds_per_slot` = 0.
let mut timer_future = interval_at(start_instant, Duration::from_millis(milliseconds_per_slot)); let timer_future = interval_at(start_instant, Duration::from_millis(milliseconds_per_slot))
let timer_future = async move { .for_each(move |_| {
while let Some(_) = timer_future.next().await {
beacon_chain.per_slot_task(); beacon_chain.per_slot_task();
match exit.try_recv() { future::ready(())
Ok(_) | Err(TryRecvError::Closed) => break, });
Err(TryRecvError::Empty) => {}
}
}
};
tokio::task::spawn(timer_future); let future = futures::future::select(timer_future, exit.map_err(|_| ()).map(|_| ()));
handle.spawn(future);
Ok(exit_signal) Ok(exit_signal)
} }