Further updates

This commit is contained in:
Age Manning
2020-05-08 16:07:31 +10:00
parent c4d5af81df
commit d54356036b
3 changed files with 11 additions and 18 deletions

View File

@@ -285,8 +285,8 @@ impl<T: EthSpec, S: Store<T>> CachingEth1Backend<T, S> {
} }
/// Starts the routine which connects to the external eth1 node and updates the caches. /// Starts the routine which connects to the external eth1 node and updates the caches.
pub async fn start(&self, exit: tokio::sync::oneshot::Receiver<()>) { fn start(&self, exit: tokio::sync::oneshot::Receiver<()>) {
tokio::spawn(async move { self.core.auto_update(exit).await }); tokio::spawn(HttpService::auto_update(self.core.clone(), exit));
} }
/// Instantiates `self` from an existing service. /// Instantiates `self` from an existing service.

View File

@@ -657,7 +657,7 @@ where
}; };
// Starts the service that connects to an eth1 node and periodically updates caches. // Starts the service that connects to an eth1 node and periodically updates caches.
context.runtime_handle.spawn(backend.start(exit)); context.runtime_handle.enter(|| backend.start(exit));
self.beacon_chain_builder = Some(beacon_chain_builder.eth1_backend(Some(backend))); self.beacon_chain_builder = Some(beacon_chain_builder.eth1_backend(Some(backend)));

View File

@@ -6,18 +6,13 @@ use crate::{
inner::{DepositUpdater, Inner}, inner::{DepositUpdater, Inner},
DepositLog, DepositLog,
}; };
use futures::{ use futures::{future::TryFutureExt, stream, stream::TryStreamExt};
future::{FutureExt, TryFutureExt},
stream,
stream::TryStreamExt,
};
use parking_lot::{RwLock, RwLockReadGuard}; use parking_lot::{RwLock, RwLockReadGuard};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use slog::{debug, error, info, trace, Logger}; use slog::{debug, error, info, trace, Logger};
use std::ops::{Range, RangeInclusive}; use std::ops::{Range, RangeInclusive};
use std::sync::Arc; use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH}; use std::time::{Duration, SystemTime, UNIX_EPOCH};
use tokio::sync::oneshot::error::TryRecvError;
use tokio::time::delay_for; use tokio::time::delay_for;
const STANDARD_TIMEOUT_MILLIS: u64 = 15_000; const STANDARD_TIMEOUT_MILLIS: u64 = 15_000;
@@ -290,35 +285,33 @@ impl Service {
/// - Err(_) if there is an error. /// - Err(_) if there is an error.
/// ///
/// Emits logs for debugging and errors. /// Emits logs for debugging and errors.
pub async fn auto_update(&self, exit: tokio::sync::oneshot::Receiver<()>) { pub async fn auto_update(service: Self, mut exit: tokio::sync::oneshot::Receiver<()>) {
let update_interval = Duration::from_millis(self.config().auto_update_interval_millis); let update_interval = Duration::from_millis(service.config().auto_update_interval_millis);
loop { loop {
let update_future = async move { let update_future = async {
/* let update_result = service.update().await;
let update_result = self.update().await;
match update_result { match update_result {
Err(e) => error!( Err(e) => error!(
self.log, service.log,
"Failed to update eth1 cache"; "Failed to update eth1 cache";
"retry_millis" => update_interval.as_millis(), "retry_millis" => update_interval.as_millis(),
"error" => e, "error" => e,
), ),
Ok((deposit, block)) => debug!( Ok((deposit, block)) => debug!(
self.log, service.log,
"Updated eth1 cache"; "Updated eth1 cache";
"retry_millis" => update_interval.as_millis(), "retry_millis" => update_interval.as_millis(),
"blocks" => format!("{:?}", block), "blocks" => format!("{:?}", block),
"deposits" => format!("{:?}", deposit), "deposits" => format!("{:?}", deposit),
), ),
}; };
*/
// WARNING: delay_for doesn't return an error and panics on error. // WARNING: delay_for doesn't return an error and panics on error.
delay_for(update_interval).await; delay_for(update_interval).await;
}; };
if let futures::future::Either::Right(_) = if let futures::future::Either::Right(_) =
futures::future::select(Box::pin(update_future), futures::future::ready(())).await futures::future::select(Box::pin(update_future), &mut exit).await
{ {
// the exit future returned end // the exit future returned end
break; break;