Merge latest master

This commit is contained in:
Age Manning
2020-05-04 15:43:49 +10:00
27 changed files with 913 additions and 545 deletions

View File

@@ -13,10 +13,10 @@ use futures::{
};
use parking_lot::{RwLock, RwLockReadGuard};
use serde::{Deserialize, Serialize};
use slog::{debug, error, trace, Logger};
use slog::{debug, error, info, trace, Logger};
use std::ops::{Range, RangeInclusive};
use std::sync::Arc;
use std::time::Duration;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use tokio::sync::oneshot::error::TryRecvError;
use tokio::time::delay_for;
@@ -63,19 +63,15 @@ pub enum Error {
/// The success message for an Eth1Data cache update.
#[derive(Debug, PartialEq, Clone)]
pub enum BlockCacheUpdateOutcome {
/// The cache was sucessfully updated.
Success {
blocks_imported: usize,
head_block_number: Option<u64>,
},
pub struct BlockCacheUpdateOutcome {
pub blocks_imported: usize,
pub head_block_number: Option<u64>,
}
/// The success message for an Eth1 deposit cache update.
#[derive(Debug, PartialEq, Clone)]
pub enum DepositCacheUpdateOutcome {
/// The cache was sucessfully updated.
Success { logs_imported: usize },
pub struct DepositCacheUpdateOutcome {
pub logs_imported: usize,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
@@ -255,7 +251,7 @@ impl Service {
.map_err(|e| format!("Failed to update eth1 cache: {:?}", e))
.then(|result| async move{
match &result {
Ok(DepositCacheUpdateOutcome::Success { logs_imported }) => trace!(
Ok(DepositCacheUpdateOutcome { logs_imported }) => trace!(
self.log,
"Updated eth1 deposit cache";
"cached_deposits" => self.inner.deposit_cache.read().cache.len(),
@@ -277,7 +273,7 @@ impl Service {
.map_err(|e| format!("Failed to update eth1 cache: {:?}", e))
.then(|result| async move {
match &result {
Ok(BlockCacheUpdateOutcome::Success {
Ok(BlockCacheUpdateOutcome {
blocks_imported,
head_block_number,
}) => trace!(
@@ -393,76 +389,93 @@ impl Service {
Vec::new()
};
stream::try_unfold(block_number_chunks.into_iter(), |mut chunks| async move {
match chunks.next() {
Some(chunk) => {
let chunk_1 = chunk.clone();
match get_deposit_logs_in_range(
&self.config().endpoint,
&self.config().deposit_contract_address,
chunk,
Duration::from_millis(GET_DEPOSIT_LOG_TIMEOUT_MILLIS),
)
.await
{
Ok(logs) => Ok(Some(((chunk_1, logs), chunks))),
Err(e) => Err(Error::GetDepositLogsFailed(e)),
let logs_imported =
stream::try_unfold(block_number_chunks.into_iter(), |mut chunks| async move {
match chunks.next() {
Some(chunk) => {
let chunk_1 = chunk.clone();
match get_deposit_logs_in_range(
&self.config().endpoint,
&self.config().deposit_contract_address,
chunk,
Duration::from_millis(GET_DEPOSIT_LOG_TIMEOUT_MILLIS),
)
.await
{
Ok(logs) => Ok(Some(((chunk_1, logs), chunks))),
Err(e) => Err(Error::GetDepositLogsFailed(e)),
}
}
None => Ok(None),
}
None => Ok(None),
}
})
.try_fold(0, |mut sum: usize, (block_range, log_chunk)| async move {
let mut cache = self.deposits().write();
log_chunk
.into_iter()
.map(|raw_log| {
DepositLog::from_log(&raw_log).map_err(|error| Error::FailedToParseDepositLog {
block_range: block_range.clone(),
error,
})
})
// Return early if any of the logs cannot be parsed.
//
// This costs an additional `collect`, however it enforces that no logs are
// imported if any one of them cannot be parsed.
.collect::<Result<Vec<_>, _>>()?
.into_iter()
.map(|deposit_log| {
cache
.cache
.insert_log(deposit_log)
.map_err(Error::FailedToInsertDeposit)?;
sum += 1;
Ok(())
})
// Returns if a deposit is unable to be added to the cache.
//
// If this error occurs, the cache will no longer be guaranteed to hold either
// none or all of the logs for each block (i.e., they may exist _some_ logs for
// a block, but not _all_ logs for that block). This scenario can cause the
// node to choose an invalid genesis state or propose an invalid block.
.collect::<Result<_, _>>()?;
cache.last_processed_block = Some(block_range.end.saturating_sub(1));
metrics::set_gauge(&metrics::DEPOSIT_CACHE_LEN, cache.cache.len() as i64);
metrics::set_gauge(
&metrics::HIGHEST_PROCESSED_DEPOSIT_BLOCK,
cache.last_processed_block.unwrap_or_else(|| 0) as i64,
);
Ok(sum)
})
.map(|logs_imported| {
Ok(DepositCacheUpdateOutcome::Success {
logs_imported: logs_imported?,
})
})
.await
.try_fold(0, |mut sum: usize, (block_range, log_chunk)| async move {
let mut cache = self.deposits().write();
log_chunk
.into_iter()
.map(|raw_log| {
DepositLog::from_log(&raw_log).map_err(|error| {
Error::FailedToParseDepositLog {
block_range: block_range.clone(),
error,
}
})
})
// Return early if any of the logs cannot be parsed.
//
// This costs an additional `collect`, however it enforces that no logs are
// imported if any one of them cannot be parsed.
.collect::<Result<Vec<_>, _>>()?
.into_iter()
.map(|deposit_log| {
cache
.cache
.insert_log(deposit_log)
.map_err(Error::FailedToInsertDeposit)?;
sum += 1;
Ok(())
})
// Returns if a deposit is unable to be added to the cache.
//
// If this error occurs, the cache will no longer be guaranteed to hold either
// none or all of the logs for each block (i.e., they may exist _some_ logs for
// a block, but not _all_ logs for that block). This scenario can cause the
// node to choose an invalid genesis state or propose an invalid block.
.collect::<Result<_, _>>()?;
cache.last_processed_block = Some(block_range.end.saturating_sub(1));
metrics::set_gauge(&metrics::DEPOSIT_CACHE_LEN, cache.cache.len() as i64);
metrics::set_gauge(
&metrics::HIGHEST_PROCESSED_DEPOSIT_BLOCK,
cache.last_processed_block.unwrap_or_else(|| 0) as i64,
);
Ok(sum)
})
.await?;
if logs_imported > 0 {
info!(
self.log,
"Imported deposit log(s)";
"latest_block" => self.inner.deposit_cache.read().cache.latest_block_number(),
"total" => self.deposit_cache_len(),
"new" => logs_imported
);
} else {
debug!(
self.log,
"No new deposits found";
"latest_block" => self.inner.deposit_cache.read().cache.latest_block_number(),
"total_deposits" => self.deposit_cache_len(),
);
}
Ok(DepositCacheUpdateOutcome { logs_imported })
}
/// Contacts the remote eth1 node and attempts to import all blocks up to the configured
@@ -581,6 +594,7 @@ impl Service {
Ok(sum + 1)
})
.await?;
// Prune the block cache, preventing it from growing too large.
self.inner.prune_blocks();
@@ -589,7 +603,37 @@ impl Service {
self.inner.block_cache.read().len() as i64,
);
Ok(BlockCacheUpdateOutcome::Success {
let block_cache = self.inner.block_cache.read();
let latest_block_mins = block_cache
.latest_block_timestamp()
.and_then(|timestamp| {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.ok()
.and_then(|now| now.checked_sub(Duration::from_secs(timestamp)))
})
.map(|duration| format!("{} mins", duration.as_secs() / 60))
.unwrap_or_else(|| "n/a".into());
if blocks_imported > 0 {
info!(
self.log,
"Imported eth1 block(s)";
"latest_block_age" => latest_block_mins,
"latest_block" => block_cache.highest_block_number(),
"total_cached_blocks" => block_cache.len(),
"new" => blocks_imported
);
} else {
debug!(
self.log,
"No new eth1 blocks imported";
"latest_block" => block_cache.highest_block_number(),
"cached_blocks" => block_cache.len(),
);
}
Ok(BlockCacheUpdateOutcome {
blocks_imported,
head_block_number: self.inner.block_cache.read().highest_block_number(),
})