From 1475302c443844a4703705d9d2edc3ac9c043cd0 Mon Sep 17 00:00:00 2001 From: pawan Date: Mon, 20 Apr 2020 19:53:10 +0530 Subject: [PATCH] Port timer to stable futures --- beacon_node/timer/Cargo.toml | 4 ++-- beacon_node/timer/src/lib.rs | 43 ++++++++++++++---------------------- 2 files changed, 19 insertions(+), 28 deletions(-) diff --git a/beacon_node/timer/Cargo.toml b/beacon_node/timer/Cargo.toml index a0c132cbe1..7a2e2ec4a2 100644 --- a/beacon_node/timer/Cargo.toml +++ b/beacon_node/timer/Cargo.toml @@ -8,7 +8,7 @@ edition = "2018" beacon_chain = { path = "../beacon_chain" } types = { path = "../../eth2/types" } slot_clock = { path = "../../eth2/utils/slot_clock" } -tokio = "0.1.22" +tokio = { version = "0.2", features = ["full"] } slog = "2.5.2" parking_lot = "0.10.0" -futures = "0.1.29" +futures = "0.3" diff --git a/beacon_node/timer/src/lib.rs b/beacon_node/timer/src/lib.rs index 20054d8540..c0d8e28ebc 100644 --- a/beacon_node/timer/src/lib.rs +++ b/beacon_node/timer/src/lib.rs @@ -3,22 +3,19 @@ //! This service allows task execution on the beacon node for various functionality. use beacon_chain::{BeaconChain, BeaconChainTypes}; -use futures::{future, prelude::*}; -use slog::error; +use futures::stream::{StreamExt}; use slot_clock::SlotClock; use std::sync::Arc; -use std::time::{Duration, Instant}; -use tokio::runtime::TaskExecutor; -use tokio::timer::Interval; +use std::time::Duration; +use tokio::sync::oneshot::error::TryRecvError; +use tokio::time::{interval_at, Instant}; /// Spawns a timer service which periodically executes tasks for the beacon chain -pub fn spawn( - executor: &TaskExecutor, +pub async fn spawn( beacon_chain: Arc>, milliseconds_per_slot: u64, - log: slog::Logger, ) -> Result, &'static str> { - let (exit_signal, exit) = tokio::sync::oneshot::channel(); + let (exit_signal, mut exit) = tokio::sync::oneshot::channel(); let start_instant = Instant::now() + beacon_chain @@ -26,25 +23,19 @@ pub fn spawn( .duration_to_next_slot() .ok_or_else(|| "slot_notifier unable to determine time to next slot")?; - let timer_future = Interval::new(start_instant, Duration::from_millis(milliseconds_per_slot)) - .map_err(move |e| { - error!( - log, - "Beacon chain timer failed"; - "error" => format!("{:?}", e) - ) - }) - .for_each(move |_| { + // Warning: `interval_at` panics on error + let mut timer_future = interval_at(start_instant, Duration::from_millis(milliseconds_per_slot)); + let timer_future = async move { + while let Some(_) = timer_future.next().await { beacon_chain.per_slot_task(); - future::ok(()) - }); + match exit.try_recv() { + Ok(_) | Err(TryRecvError::Closed) => break, + Err(TryRecvError::Empty) => {} + } + } + }; - executor.spawn( - exit.map_err(|_| ()) - .select(timer_future) - .map(|_| ()) - .map_err(|_| ()), - ); + tokio::task::spawn(timer_future); Ok(exit_signal) }