Convert self referential async functions

This commit is contained in:
Age Manning
2020-05-08 16:19:55 +10:00
parent d54356036b
commit 4617db64fa

View File

@@ -239,34 +239,32 @@ impl Service {
/// ///
/// Emits logs for debugging and errors. /// Emits logs for debugging and errors.
pub async fn update( pub async fn update(
&self, service: Self,
) -> Result<(DepositCacheUpdateOutcome, BlockCacheUpdateOutcome), String> { ) -> Result<(DepositCacheUpdateOutcome, BlockCacheUpdateOutcome), String> {
let update_deposit_cache = async { let update_deposit_cache = async {
let outcome = self let outcome = Service::update_deposit_cache(service.clone())
.update_deposit_cache()
.await .await
.map_err(|e| format!("Failed to update eth1 cache: {:?}", e))?; .map_err(|e| format!("Failed to update eth1 cache: {:?}", e))?;
trace!( trace!(
self.log, service.log,
"Updated eth1 deposit cache"; "Updated eth1 deposit cache";
"cached_deposits" => self.inner.deposit_cache.read().cache.len(), "cached_deposits" => service.inner.deposit_cache.read().cache.len(),
"logs_imported" => outcome.logs_imported, "logs_imported" => outcome.logs_imported,
"last_processed_eth1_block" => self.inner.deposit_cache.read().last_processed_block, "last_processed_eth1_block" => service.inner.deposit_cache.read().last_processed_block,
); );
Ok(outcome) Ok(outcome)
}; };
let update_block_cache = async { let update_block_cache = async {
let outcome = self let outcome = Service::update_block_cache(service.clone())
.update_block_cache()
.await .await
.map_err(|e| format!("Failed to update eth1 cache: {:?}", e))?; .map_err(|e| format!("Failed to update eth1 cache: {:?}", e))?;
trace!( trace!(
self.log, service.log,
"Updated eth1 block cache"; "Updated eth1 block cache";
"cached_blocks" => self.inner.block_cache.read().len(), "cached_blocks" => service.inner.block_cache.read().len(),
"blocks_imported" => outcome.blocks_imported, "blocks_imported" => outcome.blocks_imported,
"head_block" => outcome.head_block_number, "head_block" => outcome.head_block_number,
); );
@@ -290,7 +288,7 @@ impl Service {
loop { loop {
let update_future = async { let update_future = async {
let update_result = service.update().await; let update_result = Service::update(service.clone()).await;
match update_result { match update_result {
Err(e) => error!( Err(e) => error!(
@@ -331,24 +329,24 @@ impl Service {
/// - Err(_) if there is an error. /// - Err(_) if there is an error.
/// ///
/// Emits logs for debugging and errors. /// Emits logs for debugging and errors.
pub async fn update_deposit_cache(&self) -> Result<DepositCacheUpdateOutcome, Error> { pub async fn update_deposit_cache(service: Self) -> Result<DepositCacheUpdateOutcome, Error> {
let blocks_per_log_query = self.config().blocks_per_log_query; let blocks_per_log_query = service.config().blocks_per_log_query;
let max_log_requests_per_update = self let max_log_requests_per_update = service
.config() .config()
.max_log_requests_per_update .max_log_requests_per_update
.unwrap_or_else(usize::max_value); .unwrap_or_else(usize::max_value);
let next_required_block = self let next_required_block = service
.deposits() .deposits()
.read() .read()
.last_processed_block .last_processed_block
.map(|n| n + 1) .map(|n| n + 1)
.unwrap_or_else(|| self.config().deposit_contract_deploy_block); .unwrap_or_else(|| service.config().deposit_contract_deploy_block);
let range = get_new_block_numbers( let range = get_new_block_numbers(
&self.config().endpoint, &service.config().endpoint,
next_required_block, next_required_block,
self.config().follow_distance, service.config().follow_distance,
) )
.await?; .await?;
@@ -368,13 +366,13 @@ impl Service {
}; };
let logs_imported = let logs_imported =
stream::try_unfold(block_number_chunks.into_iter(), |mut chunks| async move { stream::try_unfold(block_number_chunks.into_iter(), |mut chunks| async {
match chunks.next() { match chunks.next() {
Some(chunk) => { Some(chunk) => {
let chunk_1 = chunk.clone(); let chunk_1 = chunk.clone();
match get_deposit_logs_in_range( match get_deposit_logs_in_range(
&self.config().endpoint, &service.config().endpoint,
&self.config().deposit_contract_address, &service.config().deposit_contract_address,
chunk, chunk,
Duration::from_millis(GET_DEPOSIT_LOG_TIMEOUT_MILLIS), Duration::from_millis(GET_DEPOSIT_LOG_TIMEOUT_MILLIS),
) )
@@ -388,7 +386,7 @@ impl Service {
} }
}) })
.try_fold(0, |mut sum: usize, (block_range, log_chunk)| async move { .try_fold(0, |mut sum: usize, (block_range, log_chunk)| async move {
let mut cache = self.deposits().write(); let mut cache = service.deposits().write();
log_chunk log_chunk
.into_iter() .into_iter()
@@ -438,18 +436,18 @@ impl Service {
if logs_imported > 0 { if logs_imported > 0 {
info!( info!(
self.log, service.log,
"Imported deposit log(s)"; "Imported deposit log(s)";
"latest_block" => self.inner.deposit_cache.read().cache.latest_block_number(), "latest_block" => service.inner.deposit_cache.read().cache.latest_block_number(),
"total" => self.deposit_cache_len(), "total" => service.deposit_cache_len(),
"new" => logs_imported "new" => logs_imported
); );
} else { } else {
debug!( debug!(
self.log, service.log,
"No new deposits found"; "No new deposits found";
"latest_block" => self.inner.deposit_cache.read().cache.latest_block_number(), "latest_block" => service.inner.deposit_cache.read().cache.latest_block_number(),
"total_deposits" => self.deposit_cache_len(), "total_deposits" => service.deposit_cache_len(),
); );
} }
@@ -467,25 +465,25 @@ impl Service {
/// - Err(_) if there is an error. /// - Err(_) if there is an error.
/// ///
/// Emits logs for debugging and errors. /// Emits logs for debugging and errors.
pub async fn update_block_cache(&self) -> Result<BlockCacheUpdateOutcome, Error> { pub async fn update_block_cache(service: Self) -> Result<BlockCacheUpdateOutcome, Error> {
let block_cache_truncation = self.config().block_cache_truncation; let block_cache_truncation = service.config().block_cache_truncation;
let max_blocks_per_update = self let max_blocks_per_update = service
.config() .config()
.max_blocks_per_update .max_blocks_per_update
.unwrap_or_else(usize::max_value); .unwrap_or_else(usize::max_value);
let next_required_block = self let next_required_block = service
.inner .inner
.block_cache .block_cache
.read() .read()
.highest_block_number() .highest_block_number()
.map(|n| n + 1) .map(|n| n + 1)
.unwrap_or_else(|| self.config().lowest_cached_block_number); .unwrap_or_else(|| service.config().lowest_cached_block_number);
let range = get_new_block_numbers( let range = get_new_block_numbers(
&self.config().endpoint, &service.config().endpoint,
next_required_block, next_required_block,
self.config().follow_distance, service.config().follow_distance,
) )
.await?; .await?;
// Map the range of required blocks into a Vec. // Map the range of required blocks into a Vec.
@@ -507,7 +505,7 @@ impl Service {
// If the range of required blocks is larger than `max_size`, drop all // If the range of required blocks is larger than `max_size`, drop all
// existing blocks and download `max_size` count of blocks. // existing blocks and download `max_size` count of blocks.
let first_block = range.end() - max_size; let first_block = range.end() - max_size;
(*self.inner.block_cache.write()) = BlockCache::default(); (*service.inner.block_cache.write()) = BlockCache::default();
(first_block..=*range.end()).collect::<Vec<u64>>() (first_block..=*range.end()).collect::<Vec<u64>>()
} else { } else {
range.collect::<Vec<u64>>() range.collect::<Vec<u64>>()
@@ -518,7 +516,7 @@ impl Service {
}; };
// Download the range of blocks and sequentially import them into the cache. // Download the range of blocks and sequentially import them into the cache.
// Last processed block in deposit cache // Last processed block in deposit cache
let latest_in_cache = self let latest_in_cache = service
.inner .inner
.deposit_cache .deposit_cache
.read() .read()
@@ -538,7 +536,7 @@ impl Service {
|mut block_numbers| async move { |mut block_numbers| async move {
match block_numbers.next() { match block_numbers.next() {
Some(block_number) => { Some(block_number) => {
match download_eth1_block(self.inner.clone(), block_number).await { match download_eth1_block(service.inner.clone(), block_number).await {
Ok(eth1_block) => Ok(Some((eth1_block, block_numbers))), Ok(eth1_block) => Ok(Some((eth1_block, block_numbers))),
Err(e) => Err(e), Err(e) => Err(e),
} }
@@ -550,7 +548,8 @@ impl Service {
let blocks_imported = eth1_blocks let blocks_imported = eth1_blocks
.try_fold(0, |sum: usize, eth1_block| async move { .try_fold(0, |sum: usize, eth1_block| async move {
self.inner service
.inner
.block_cache .block_cache
.write() .write()
.insert_root_or_child(eth1_block) .insert_root_or_child(eth1_block)
@@ -558,11 +557,12 @@ impl Service {
metrics::set_gauge( metrics::set_gauge(
&metrics::BLOCK_CACHE_LEN, &metrics::BLOCK_CACHE_LEN,
self.inner.block_cache.read().len() as i64, service.inner.block_cache.read().len() as i64,
); );
metrics::set_gauge( metrics::set_gauge(
&metrics::LATEST_CACHED_BLOCK_TIMESTAMP, &metrics::LATEST_CACHED_BLOCK_TIMESTAMP,
self.inner service
.inner
.block_cache .block_cache
.read() .read()
.latest_block_timestamp() .latest_block_timestamp()
@@ -574,14 +574,14 @@ impl Service {
.await?; .await?;
// Prune the block cache, preventing it from growing too large. // Prune the block cache, preventing it from growing too large.
self.inner.prune_blocks(); service.inner.prune_blocks();
metrics::set_gauge( metrics::set_gauge(
&metrics::BLOCK_CACHE_LEN, &metrics::BLOCK_CACHE_LEN,
self.inner.block_cache.read().len() as i64, service.inner.block_cache.read().len() as i64,
); );
let block_cache = self.inner.block_cache.read(); let block_cache = service.inner.block_cache.read();
let latest_block_mins = block_cache let latest_block_mins = block_cache
.latest_block_timestamp() .latest_block_timestamp()
.and_then(|timestamp| { .and_then(|timestamp| {
@@ -595,7 +595,7 @@ impl Service {
if blocks_imported > 0 { if blocks_imported > 0 {
info!( info!(
self.log, service.log,
"Imported eth1 block(s)"; "Imported eth1 block(s)";
"latest_block_age" => latest_block_mins, "latest_block_age" => latest_block_mins,
"latest_block" => block_cache.highest_block_number(), "latest_block" => block_cache.highest_block_number(),
@@ -604,14 +604,14 @@ impl Service {
); );
} else { } else {
debug!( debug!(
self.log, service.log,
"No new eth1 blocks imported"; "No new eth1 blocks imported";
"latest_block" => block_cache.highest_block_number(), "latest_block" => block_cache.highest_block_number(),
"cached_blocks" => block_cache.len(), "cached_blocks" => block_cache.len(),
); );
} }
let block_cache = self.inner.block_cache.read(); let block_cache = service.inner.block_cache.read();
let latest_block_mins = block_cache let latest_block_mins = block_cache
.latest_block_timestamp() .latest_block_timestamp()
.and_then(|timestamp| { .and_then(|timestamp| {
@@ -625,7 +625,7 @@ impl Service {
if blocks_imported > 0 { if blocks_imported > 0 {
debug!( debug!(
self.log, service.log,
"Imported eth1 block(s)"; "Imported eth1 block(s)";
"latest_block_age" => latest_block_mins, "latest_block_age" => latest_block_mins,
"latest_block" => block_cache.highest_block_number(), "latest_block" => block_cache.highest_block_number(),
@@ -634,7 +634,7 @@ impl Service {
); );
} else { } else {
debug!( debug!(
self.log, service.log,
"No new eth1 blocks imported"; "No new eth1 blocks imported";
"latest_block" => block_cache.highest_block_number(), "latest_block" => block_cache.highest_block_number(),
"cached_blocks" => block_cache.len(), "cached_blocks" => block_cache.len(),
@@ -643,7 +643,7 @@ impl Service {
Ok(BlockCacheUpdateOutcome { Ok(BlockCacheUpdateOutcome {
blocks_imported, blocks_imported,
head_block_number: self.inner.block_cache.read().highest_block_number(), head_block_number: service.inner.block_cache.read().highest_block_number(),
}) })
} }
} }