mirror of
https://github.com/sigp/lighthouse.git
synced 2026-03-20 21:34:46 +00:00
stable futures fixes (#1124)
* Fix eth1 update functions * Fix genesis and client * Fix beacon node lib * Return appropriate runtimes from environment * Fix test rig * Refactor eth1 service update
This commit is contained in:
@@ -2,18 +2,18 @@ use crate::metrics;
|
||||
use crate::{
|
||||
block_cache::{BlockCache, Error as BlockCacheError, Eth1Block},
|
||||
deposit_cache::Error as DepositCacheError,
|
||||
http::{get_block, get_block_number, get_deposit_logs_in_range},
|
||||
http::{get_block, get_block_number, get_deposit_logs_in_range, Log},
|
||||
inner::{DepositUpdater, Inner},
|
||||
DepositLog,
|
||||
};
|
||||
use futures::{future::TryFutureExt, stream, stream::TryStreamExt};
|
||||
use futures::{future::TryFutureExt, stream, stream::TryStreamExt, StreamExt};
|
||||
use parking_lot::{RwLock, RwLockReadGuard};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use slog::{debug, error, info, trace, Logger};
|
||||
use std::ops::{Range, RangeInclusive};
|
||||
use std::sync::Arc;
|
||||
use std::time::{Duration, SystemTime, UNIX_EPOCH};
|
||||
use tokio::time::delay_for;
|
||||
use std::time::{SystemTime, UNIX_EPOCH};
|
||||
use tokio::time::{interval_at, Duration, Instant};
|
||||
|
||||
const STANDARD_TIMEOUT_MILLIS: u64 = 15_000;
|
||||
|
||||
@@ -283,38 +283,38 @@ impl Service {
|
||||
/// - Err(_) if there is an error.
|
||||
///
|
||||
/// Emits logs for debugging and errors.
|
||||
pub async fn auto_update(service: Self, mut exit: tokio::sync::oneshot::Receiver<()>) {
|
||||
pub fn auto_update(service: Self, exit: tokio::sync::oneshot::Receiver<()>) {
|
||||
let update_interval = Duration::from_millis(service.config().auto_update_interval_millis);
|
||||
|
||||
loop {
|
||||
let update_future = async {
|
||||
let update_result = Service::update(service.clone()).await;
|
||||
let interval = interval_at(Instant::now(), update_interval);
|
||||
|
||||
match update_result {
|
||||
Err(e) => error!(
|
||||
service.log,
|
||||
"Failed to update eth1 cache";
|
||||
"retry_millis" => update_interval.as_millis(),
|
||||
"error" => e,
|
||||
),
|
||||
Ok((deposit, block)) => debug!(
|
||||
service.log,
|
||||
"Updated eth1 cache";
|
||||
"retry_millis" => update_interval.as_millis(),
|
||||
"blocks" => format!("{:?}", block),
|
||||
"deposits" => format!("{:?}", deposit),
|
||||
),
|
||||
};
|
||||
// WARNING: delay_for doesn't return an error and panics on error.
|
||||
delay_for(update_interval).await;
|
||||
};
|
||||
if let futures::future::Either::Right(_) =
|
||||
futures::future::select(Box::pin(update_future), &mut exit).await
|
||||
{
|
||||
// the exit future returned end
|
||||
break;
|
||||
}
|
||||
}
|
||||
let update_future = interval.for_each(move |_| {
|
||||
let _ = Service::do_update(service.clone(), update_interval);
|
||||
futures::future::ready(())
|
||||
});
|
||||
let future = futures::future::select(update_future, exit);
|
||||
tokio::task::spawn(future);
|
||||
}
|
||||
|
||||
async fn do_update(service: Self, update_interval: Duration) -> Result<(), ()> {
|
||||
let update_result = Service::update(service.clone()).await;
|
||||
println!("Going on");
|
||||
match update_result {
|
||||
Err(e) => error!(
|
||||
service.log,
|
||||
"Failed to update eth1 cache";
|
||||
"retry_millis" => update_interval.as_millis(),
|
||||
"error" => e,
|
||||
),
|
||||
Ok((deposit, block)) => debug!(
|
||||
service.log,
|
||||
"Updated eth1 cache";
|
||||
"retry_millis" => update_interval.as_millis(),
|
||||
"blocks" => format!("{:?}", block),
|
||||
"deposits" => format!("{:?}", deposit),
|
||||
),
|
||||
};
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Contacts the remote eth1 node and attempts to import deposit logs up to the configured
|
||||
@@ -365,7 +365,7 @@ impl Service {
|
||||
Vec::new()
|
||||
};
|
||||
|
||||
let logs_imported =
|
||||
let logs: Vec<(Range<u64>, Vec<Log>)> =
|
||||
stream::try_unfold(block_number_chunks.into_iter(), |mut chunks| async {
|
||||
match chunks.next() {
|
||||
Some(chunk) => {
|
||||
@@ -385,55 +385,53 @@ impl Service {
|
||||
None => Ok(None),
|
||||
}
|
||||
})
|
||||
.try_fold(0, |mut sum: usize, (block_range, log_chunk)| async move {
|
||||
let mut cache = service.deposits().write();
|
||||
|
||||
log_chunk
|
||||
.into_iter()
|
||||
.map(|raw_log| {
|
||||
DepositLog::from_log(&raw_log).map_err(|error| {
|
||||
Error::FailedToParseDepositLog {
|
||||
block_range: block_range.clone(),
|
||||
error,
|
||||
}
|
||||
})
|
||||
})
|
||||
// Return early if any of the logs cannot be parsed.
|
||||
//
|
||||
// This costs an additional `collect`, however it enforces that no logs are
|
||||
// imported if any one of them cannot be parsed.
|
||||
.collect::<Result<Vec<_>, _>>()?
|
||||
.into_iter()
|
||||
.map(|deposit_log| {
|
||||
cache
|
||||
.cache
|
||||
.insert_log(deposit_log)
|
||||
.map_err(Error::FailedToInsertDeposit)?;
|
||||
|
||||
sum += 1;
|
||||
|
||||
Ok(())
|
||||
})
|
||||
// Returns if a deposit is unable to be added to the cache.
|
||||
//
|
||||
// If this error occurs, the cache will no longer be guaranteed to hold either
|
||||
// none or all of the logs for each block (i.e., they may exist _some_ logs for
|
||||
// a block, but not _all_ logs for that block). This scenario can cause the
|
||||
// node to choose an invalid genesis state or propose an invalid block.
|
||||
.collect::<Result<_, _>>()?;
|
||||
|
||||
cache.last_processed_block = Some(block_range.end.saturating_sub(1));
|
||||
|
||||
metrics::set_gauge(&metrics::DEPOSIT_CACHE_LEN, cache.cache.len() as i64);
|
||||
metrics::set_gauge(
|
||||
&metrics::HIGHEST_PROCESSED_DEPOSIT_BLOCK,
|
||||
cache.last_processed_block.unwrap_or_else(|| 0) as i64,
|
||||
);
|
||||
|
||||
Ok(sum)
|
||||
})
|
||||
.try_collect()
|
||||
.await?;
|
||||
|
||||
let mut logs_imported = 0;
|
||||
for (block_range, log_chunk) in logs.iter() {
|
||||
let mut cache = service.deposits().write();
|
||||
log_chunk
|
||||
.into_iter()
|
||||
.map(|raw_log| {
|
||||
DepositLog::from_log(&raw_log).map_err(|error| Error::FailedToParseDepositLog {
|
||||
block_range: block_range.clone(),
|
||||
error,
|
||||
})
|
||||
})
|
||||
// Return early if any of the logs cannot be parsed.
|
||||
//
|
||||
// This costs an additional `collect`, however it enforces that no logs are
|
||||
// imported if any one of them cannot be parsed.
|
||||
.collect::<Result<Vec<_>, _>>()?
|
||||
.into_iter()
|
||||
.map(|deposit_log| {
|
||||
cache
|
||||
.cache
|
||||
.insert_log(deposit_log)
|
||||
.map_err(Error::FailedToInsertDeposit)?;
|
||||
|
||||
logs_imported += 1;
|
||||
|
||||
Ok(())
|
||||
})
|
||||
// Returns if a deposit is unable to be added to the cache.
|
||||
//
|
||||
// If this error occurs, the cache will no longer be guaranteed to hold either
|
||||
// none or all of the logs for each block (i.e., they may exist _some_ logs for
|
||||
// a block, but not _all_ logs for that block). This scenario can cause the
|
||||
// node to choose an invalid genesis state or propose an invalid block.
|
||||
.collect::<Result<_, _>>()?;
|
||||
|
||||
cache.last_processed_block = Some(block_range.end.saturating_sub(1));
|
||||
|
||||
metrics::set_gauge(&metrics::DEPOSIT_CACHE_LEN, cache.cache.len() as i64);
|
||||
metrics::set_gauge(
|
||||
&metrics::HIGHEST_PROCESSED_DEPOSIT_BLOCK,
|
||||
cache.last_processed_block.unwrap_or_else(|| 0) as i64,
|
||||
);
|
||||
}
|
||||
|
||||
if logs_imported > 0 {
|
||||
info!(
|
||||
service.log,
|
||||
@@ -531,9 +529,9 @@ impl Service {
|
||||
// Produce a stream from the list of required block numbers and return a future that
|
||||
// consumes the it.
|
||||
|
||||
let eth1_blocks = stream::try_unfold(
|
||||
let eth1_blocks: Vec<Eth1Block> = stream::try_unfold(
|
||||
required_block_numbers.into_iter(),
|
||||
|mut block_numbers| async move {
|
||||
|mut block_numbers| async {
|
||||
match block_numbers.next() {
|
||||
Some(block_number) => {
|
||||
match download_eth1_block(service.inner.clone(), block_number).await {
|
||||
@@ -544,34 +542,35 @@ impl Service {
|
||||
None => Ok(None),
|
||||
}
|
||||
},
|
||||
);
|
||||
)
|
||||
.try_collect()
|
||||
.await?;
|
||||
|
||||
let blocks_imported = eth1_blocks
|
||||
.try_fold(0, |sum: usize, eth1_block| async move {
|
||||
let mut blocks_imported = 0;
|
||||
for eth1_block in eth1_blocks {
|
||||
service
|
||||
.inner
|
||||
.block_cache
|
||||
.write()
|
||||
.insert_root_or_child(eth1_block)
|
||||
.map_err(Error::FailedToInsertEth1Block)?;
|
||||
|
||||
metrics::set_gauge(
|
||||
&metrics::BLOCK_CACHE_LEN,
|
||||
service.inner.block_cache.read().len() as i64,
|
||||
);
|
||||
metrics::set_gauge(
|
||||
&metrics::LATEST_CACHED_BLOCK_TIMESTAMP,
|
||||
service
|
||||
.inner
|
||||
.block_cache
|
||||
.write()
|
||||
.insert_root_or_child(eth1_block)
|
||||
.map_err(Error::FailedToInsertEth1Block)?;
|
||||
.read()
|
||||
.latest_block_timestamp()
|
||||
.unwrap_or_else(|| 0) as i64,
|
||||
);
|
||||
|
||||
metrics::set_gauge(
|
||||
&metrics::BLOCK_CACHE_LEN,
|
||||
service.inner.block_cache.read().len() as i64,
|
||||
);
|
||||
metrics::set_gauge(
|
||||
&metrics::LATEST_CACHED_BLOCK_TIMESTAMP,
|
||||
service
|
||||
.inner
|
||||
.block_cache
|
||||
.read()
|
||||
.latest_block_timestamp()
|
||||
.unwrap_or_else(|| 0) as i64,
|
||||
);
|
||||
|
||||
Ok(sum + 1)
|
||||
})
|
||||
.await?;
|
||||
blocks_imported += 1;
|
||||
}
|
||||
|
||||
// Prune the block cache, preventing it from growing too large.
|
||||
service.inner.prune_blocks();
|
||||
|
||||
Reference in New Issue
Block a user