From 4617db64fac9fb6174a963450ed5973285a8582f Mon Sep 17 00:00:00 2001 From: Age Manning Date: Fri, 8 May 2020 16:19:55 +1000 Subject: [PATCH] Convert self referential async functions --- beacon_node/eth1/src/service.rs | 100 ++++++++++++++++---------------- 1 file changed, 50 insertions(+), 50 deletions(-) diff --git a/beacon_node/eth1/src/service.rs b/beacon_node/eth1/src/service.rs index 5ddeae1f21..bb83e66f10 100644 --- a/beacon_node/eth1/src/service.rs +++ b/beacon_node/eth1/src/service.rs @@ -239,34 +239,32 @@ impl Service { /// /// Emits logs for debugging and errors. pub async fn update( - &self, + service: Self, ) -> Result<(DepositCacheUpdateOutcome, BlockCacheUpdateOutcome), String> { let update_deposit_cache = async { - let outcome = self - .update_deposit_cache() + let outcome = Service::update_deposit_cache(service.clone()) .await .map_err(|e| format!("Failed to update eth1 cache: {:?}", e))?; trace!( - self.log, + service.log, "Updated eth1 deposit cache"; - "cached_deposits" => self.inner.deposit_cache.read().cache.len(), + "cached_deposits" => service.inner.deposit_cache.read().cache.len(), "logs_imported" => outcome.logs_imported, - "last_processed_eth1_block" => self.inner.deposit_cache.read().last_processed_block, + "last_processed_eth1_block" => service.inner.deposit_cache.read().last_processed_block, ); Ok(outcome) }; let update_block_cache = async { - let outcome = self - .update_block_cache() + let outcome = Service::update_block_cache(service.clone()) .await .map_err(|e| format!("Failed to update eth1 cache: {:?}", e))?; trace!( - self.log, + service.log, "Updated eth1 block cache"; - "cached_blocks" => self.inner.block_cache.read().len(), + "cached_blocks" => service.inner.block_cache.read().len(), "blocks_imported" => outcome.blocks_imported, "head_block" => outcome.head_block_number, ); @@ -290,7 +288,7 @@ impl Service { loop { let update_future = async { - let update_result = service.update().await; + let update_result = Service::update(service.clone()).await; match update_result { Err(e) => error!( @@ -331,24 +329,24 @@ impl Service { /// - Err(_) if there is an error. /// /// Emits logs for debugging and errors. - pub async fn update_deposit_cache(&self) -> Result { - let blocks_per_log_query = self.config().blocks_per_log_query; - let max_log_requests_per_update = self + pub async fn update_deposit_cache(service: Self) -> Result { + let blocks_per_log_query = service.config().blocks_per_log_query; + let max_log_requests_per_update = service .config() .max_log_requests_per_update .unwrap_or_else(usize::max_value); - let next_required_block = self + let next_required_block = service .deposits() .read() .last_processed_block .map(|n| n + 1) - .unwrap_or_else(|| self.config().deposit_contract_deploy_block); + .unwrap_or_else(|| service.config().deposit_contract_deploy_block); let range = get_new_block_numbers( - &self.config().endpoint, + &service.config().endpoint, next_required_block, - self.config().follow_distance, + service.config().follow_distance, ) .await?; @@ -368,13 +366,13 @@ impl Service { }; let logs_imported = - stream::try_unfold(block_number_chunks.into_iter(), |mut chunks| async move { + stream::try_unfold(block_number_chunks.into_iter(), |mut chunks| async { match chunks.next() { Some(chunk) => { let chunk_1 = chunk.clone(); match get_deposit_logs_in_range( - &self.config().endpoint, - &self.config().deposit_contract_address, + &service.config().endpoint, + &service.config().deposit_contract_address, chunk, Duration::from_millis(GET_DEPOSIT_LOG_TIMEOUT_MILLIS), ) @@ -388,7 +386,7 @@ impl Service { } }) .try_fold(0, |mut sum: usize, (block_range, log_chunk)| async move { - let mut cache = self.deposits().write(); + let mut cache = service.deposits().write(); log_chunk .into_iter() @@ -438,18 +436,18 @@ impl Service { if logs_imported > 0 { info!( - self.log, + service.log, "Imported deposit log(s)"; - "latest_block" => self.inner.deposit_cache.read().cache.latest_block_number(), - "total" => self.deposit_cache_len(), + "latest_block" => service.inner.deposit_cache.read().cache.latest_block_number(), + "total" => service.deposit_cache_len(), "new" => logs_imported ); } else { debug!( - self.log, + service.log, "No new deposits found"; - "latest_block" => self.inner.deposit_cache.read().cache.latest_block_number(), - "total_deposits" => self.deposit_cache_len(), + "latest_block" => service.inner.deposit_cache.read().cache.latest_block_number(), + "total_deposits" => service.deposit_cache_len(), ); } @@ -467,25 +465,25 @@ impl Service { /// - Err(_) if there is an error. /// /// Emits logs for debugging and errors. - pub async fn update_block_cache(&self) -> Result { - let block_cache_truncation = self.config().block_cache_truncation; - let max_blocks_per_update = self + pub async fn update_block_cache(service: Self) -> Result { + let block_cache_truncation = service.config().block_cache_truncation; + let max_blocks_per_update = service .config() .max_blocks_per_update .unwrap_or_else(usize::max_value); - let next_required_block = self + let next_required_block = service .inner .block_cache .read() .highest_block_number() .map(|n| n + 1) - .unwrap_or_else(|| self.config().lowest_cached_block_number); + .unwrap_or_else(|| service.config().lowest_cached_block_number); let range = get_new_block_numbers( - &self.config().endpoint, + &service.config().endpoint, next_required_block, - self.config().follow_distance, + service.config().follow_distance, ) .await?; // Map the range of required blocks into a Vec. @@ -507,7 +505,7 @@ impl Service { // If the range of required blocks is larger than `max_size`, drop all // existing blocks and download `max_size` count of blocks. let first_block = range.end() - max_size; - (*self.inner.block_cache.write()) = BlockCache::default(); + (*service.inner.block_cache.write()) = BlockCache::default(); (first_block..=*range.end()).collect::>() } else { range.collect::>() @@ -518,7 +516,7 @@ impl Service { }; // Download the range of blocks and sequentially import them into the cache. // Last processed block in deposit cache - let latest_in_cache = self + let latest_in_cache = service .inner .deposit_cache .read() @@ -538,7 +536,7 @@ impl Service { |mut block_numbers| async move { match block_numbers.next() { Some(block_number) => { - match download_eth1_block(self.inner.clone(), block_number).await { + match download_eth1_block(service.inner.clone(), block_number).await { Ok(eth1_block) => Ok(Some((eth1_block, block_numbers))), Err(e) => Err(e), } @@ -550,7 +548,8 @@ impl Service { let blocks_imported = eth1_blocks .try_fold(0, |sum: usize, eth1_block| async move { - self.inner + service + .inner .block_cache .write() .insert_root_or_child(eth1_block) @@ -558,11 +557,12 @@ impl Service { metrics::set_gauge( &metrics::BLOCK_CACHE_LEN, - self.inner.block_cache.read().len() as i64, + service.inner.block_cache.read().len() as i64, ); metrics::set_gauge( &metrics::LATEST_CACHED_BLOCK_TIMESTAMP, - self.inner + service + .inner .block_cache .read() .latest_block_timestamp() @@ -574,14 +574,14 @@ impl Service { .await?; // Prune the block cache, preventing it from growing too large. - self.inner.prune_blocks(); + service.inner.prune_blocks(); metrics::set_gauge( &metrics::BLOCK_CACHE_LEN, - self.inner.block_cache.read().len() as i64, + service.inner.block_cache.read().len() as i64, ); - let block_cache = self.inner.block_cache.read(); + let block_cache = service.inner.block_cache.read(); let latest_block_mins = block_cache .latest_block_timestamp() .and_then(|timestamp| { @@ -595,7 +595,7 @@ impl Service { if blocks_imported > 0 { info!( - self.log, + service.log, "Imported eth1 block(s)"; "latest_block_age" => latest_block_mins, "latest_block" => block_cache.highest_block_number(), @@ -604,14 +604,14 @@ impl Service { ); } else { debug!( - self.log, + service.log, "No new eth1 blocks imported"; "latest_block" => block_cache.highest_block_number(), "cached_blocks" => block_cache.len(), ); } - let block_cache = self.inner.block_cache.read(); + let block_cache = service.inner.block_cache.read(); let latest_block_mins = block_cache .latest_block_timestamp() .and_then(|timestamp| { @@ -625,7 +625,7 @@ impl Service { if blocks_imported > 0 { debug!( - self.log, + service.log, "Imported eth1 block(s)"; "latest_block_age" => latest_block_mins, "latest_block" => block_cache.highest_block_number(), @@ -634,7 +634,7 @@ impl Service { ); } else { debug!( - self.log, + service.log, "No new eth1 blocks imported"; "latest_block" => block_cache.highest_block_number(), "cached_blocks" => block_cache.len(), @@ -643,7 +643,7 @@ impl Service { Ok(BlockCacheUpdateOutcome { blocks_imported, - head_block_number: self.inner.block_cache.read().highest_block_number(), + head_block_number: service.inner.block_cache.read().highest_block_number(), }) } }