diff --git a/beacon_node/beacon_chain/src/eth1_chain.rs b/beacon_node/beacon_chain/src/eth1_chain.rs index a18ae5f2e2..1020fc0826 100644 --- a/beacon_node/beacon_chain/src/eth1_chain.rs +++ b/beacon_node/beacon_chain/src/eth1_chain.rs @@ -285,8 +285,8 @@ impl> CachingEth1Backend { } /// Starts the routine which connects to the external eth1 node and updates the caches. - pub async fn start(&self, exit: tokio::sync::oneshot::Receiver<()>) { - tokio::spawn(async move { self.core.auto_update(exit).await }); + fn start(&self, exit: tokio::sync::oneshot::Receiver<()>) { + tokio::spawn(HttpService::auto_update(self.core.clone(), exit)); } /// Instantiates `self` from an existing service. diff --git a/beacon_node/client/src/builder.rs b/beacon_node/client/src/builder.rs index c25de43c13..2483e70a98 100644 --- a/beacon_node/client/src/builder.rs +++ b/beacon_node/client/src/builder.rs @@ -657,7 +657,7 @@ where }; // Starts the service that connects to an eth1 node and periodically updates caches. - context.runtime_handle.spawn(backend.start(exit)); + context.runtime_handle.enter(|| backend.start(exit)); self.beacon_chain_builder = Some(beacon_chain_builder.eth1_backend(Some(backend))); diff --git a/beacon_node/eth1/src/service.rs b/beacon_node/eth1/src/service.rs index 3b21ac2e2a..5ddeae1f21 100644 --- a/beacon_node/eth1/src/service.rs +++ b/beacon_node/eth1/src/service.rs @@ -6,18 +6,13 @@ use crate::{ inner::{DepositUpdater, Inner}, DepositLog, }; -use futures::{ - future::{FutureExt, TryFutureExt}, - stream, - stream::TryStreamExt, -}; +use futures::{future::TryFutureExt, stream, stream::TryStreamExt}; use parking_lot::{RwLock, RwLockReadGuard}; use serde::{Deserialize, Serialize}; use slog::{debug, error, info, trace, Logger}; use std::ops::{Range, RangeInclusive}; use std::sync::Arc; use std::time::{Duration, SystemTime, UNIX_EPOCH}; -use tokio::sync::oneshot::error::TryRecvError; use tokio::time::delay_for; const STANDARD_TIMEOUT_MILLIS: u64 = 15_000; @@ -290,35 +285,33 @@ impl Service { /// - Err(_) if there is an error. /// /// Emits logs for debugging and errors. - pub async fn auto_update(&self, exit: tokio::sync::oneshot::Receiver<()>) { - let update_interval = Duration::from_millis(self.config().auto_update_interval_millis); + pub async fn auto_update(service: Self, mut exit: tokio::sync::oneshot::Receiver<()>) { + let update_interval = Duration::from_millis(service.config().auto_update_interval_millis); loop { - let update_future = async move { - /* - let update_result = self.update().await; + let update_future = async { + let update_result = service.update().await; match update_result { Err(e) => error!( - self.log, + service.log, "Failed to update eth1 cache"; "retry_millis" => update_interval.as_millis(), "error" => e, ), Ok((deposit, block)) => debug!( - self.log, + service.log, "Updated eth1 cache"; "retry_millis" => update_interval.as_millis(), "blocks" => format!("{:?}", block), "deposits" => format!("{:?}", deposit), ), }; - */ // WARNING: delay_for doesn't return an error and panics on error. delay_for(update_interval).await; }; if let futures::future::Either::Right(_) = - futures::future::select(Box::pin(update_future), futures::future::ready(())).await + futures::future::select(Box::pin(update_future), &mut exit).await { // the exit future returned end break;