diff --git a/beacon_node/eth1/src/service.rs b/beacon_node/eth1/src/service.rs index 48b90f4974..3360140eb7 100644 --- a/beacon_node/eth1/src/service.rs +++ b/beacon_node/eth1/src/service.rs @@ -280,32 +280,34 @@ impl Service { /// /// Emits logs for debugging and errors. pub async fn update( - service: Self, + &self, ) -> Result<(DepositCacheUpdateOutcome, BlockCacheUpdateOutcome), String> { let update_deposit_cache = async { - let outcome = Service::update_deposit_cache(service.clone()) + let outcome = self + .update_deposit_cache() .await .map_err(|e| format!("Failed to update eth1 cache: {:?}", e))?; trace!( - service.log, + self.log, "Updated eth1 deposit cache"; - "cached_deposits" => service.inner.deposit_cache.read().cache.len(), + "cached_deposits" => self.inner.deposit_cache.read().cache.len(), "logs_imported" => outcome.logs_imported, - "last_processed_eth1_block" => service.inner.deposit_cache.read().last_processed_block, + "last_processed_eth1_block" => self.inner.deposit_cache.read().last_processed_block, ); Ok(outcome) }; let update_block_cache = async { - let outcome = Service::update_block_cache(service.clone()) + let outcome = self + .update_block_cache() .await .map_err(|e| format!("Failed to update eth1 cache: {:?}", e))?; trace!( - service.log, + self.log, "Updated eth1 block cache"; - "cached_blocks" => service.inner.block_cache.read().len(), + "cached_blocks" => self.inner.block_cache.read().len(), "blocks_imported" => outcome.blocks_imported, "head_block" => outcome.head_block_number, ); @@ -324,33 +326,31 @@ impl Service { /// - Err(_) if there is an error. /// /// Emits logs for debugging and errors. - pub fn auto_update(service: Self, handle: environment::TaskExecutor) { - let update_interval = Duration::from_millis(service.config().auto_update_interval_millis); + pub fn auto_update(self, handle: environment::TaskExecutor) { + let update_interval = Duration::from_millis(self.config().auto_update_interval_millis); let mut interval = interval_at(Instant::now(), update_interval); let update_future = async move { while interval.next().await.is_some() { - Service::do_update(service.clone(), update_interval) - .await - .ok(); + self.do_update(update_interval).await.ok(); } }; handle.spawn(update_future, "eth1"); } - async fn do_update(service: Self, update_interval: Duration) -> Result<(), ()> { - let update_result = Service::update(service.clone()).await; + async fn do_update(&self, update_interval: Duration) -> Result<(), ()> { + let update_result = self.update().await; match update_result { Err(e) => error!( - service.log, + self.log, "Failed to update eth1 cache"; "retry_millis" => update_interval.as_millis(), "error" => e, ), Ok((deposit, block)) => debug!( - service.log, + self.log, "Updated eth1 cache"; "retry_millis" => update_interval.as_millis(), "blocks" => format!("{:?}", block), @@ -372,23 +372,23 @@ impl Service { /// - Err(_) if there is an error. /// /// Emits logs for debugging and errors. - pub async fn update_deposit_cache(service: Self) -> Result { - let endpoint = service.config().endpoint.clone(); - let follow_distance = service.config().follow_distance; - let deposit_contract_address = service.config().deposit_contract_address.clone(); + pub async fn update_deposit_cache(&self) -> Result { + let endpoint = self.config().endpoint.clone(); + let follow_distance = self.config().follow_distance; + let deposit_contract_address = self.config().deposit_contract_address.clone(); - let blocks_per_log_query = service.config().blocks_per_log_query; - let max_log_requests_per_update = service + let blocks_per_log_query = self.config().blocks_per_log_query; + let max_log_requests_per_update = self .config() .max_log_requests_per_update .unwrap_or_else(usize::max_value); - let next_required_block = service + let next_required_block = self .deposits() .read() .last_processed_block .map(|n| n + 1) - .unwrap_or_else(|| service.config().deposit_contract_deploy_block); + .unwrap_or_else(|| self.config().deposit_contract_deploy_block); let range = get_new_block_numbers(&endpoint, next_required_block, follow_distance).await?; @@ -432,7 +432,7 @@ impl Service { let mut logs_imported = 0; for (block_range, log_chunk) in logs.iter() { - let mut cache = service.deposits().write(); + let mut cache = self.deposits().write(); log_chunk .into_iter() .map(|raw_log| { @@ -478,18 +478,18 @@ impl Service { if logs_imported > 0 { info!( - service.log, + self.log, "Imported deposit log(s)"; - "latest_block" => service.inner.deposit_cache.read().cache.latest_block_number(), - "total" => service.deposit_cache_len(), + "latest_block" => self.inner.deposit_cache.read().cache.latest_block_number(), + "total" => self.deposit_cache_len(), "new" => logs_imported ); } else { debug!( - service.log, + self.log, "No new deposits found"; - "latest_block" => service.inner.deposit_cache.read().cache.latest_block_number(), - "total_deposits" => service.deposit_cache_len(), + "latest_block" => self.inner.deposit_cache.read().cache.latest_block_number(), + "total_deposits" => self.deposit_cache_len(), ); } @@ -507,23 +507,23 @@ impl Service { /// - Err(_) if there is an error. /// /// Emits logs for debugging and errors. - pub async fn update_block_cache(service: Self) -> Result { - let block_cache_truncation = service.config().block_cache_truncation; - let max_blocks_per_update = service + pub async fn update_block_cache(&self) -> Result { + let block_cache_truncation = self.config().block_cache_truncation; + let max_blocks_per_update = self .config() .max_blocks_per_update .unwrap_or_else(usize::max_value); - let next_required_block = service + let next_required_block = self .inner .block_cache .read() .highest_block_number() .map(|n| n + 1) - .unwrap_or_else(|| service.config().lowest_cached_block_number); + .unwrap_or_else(|| self.config().lowest_cached_block_number); - let endpoint = service.config().endpoint.clone(); - let follow_distance = service.config().follow_distance; + let endpoint = self.config().endpoint.clone(); + let follow_distance = self.config().follow_distance; let range = get_new_block_numbers(&endpoint, next_required_block, follow_distance).await?; // Map the range of required blocks into a Vec. @@ -545,7 +545,7 @@ impl Service { // If the range of required blocks is larger than `max_size`, drop all // existing blocks and download `max_size` count of blocks. let first_block = range.end() - max_size; - (*service.inner.block_cache.write()) = BlockCache::default(); + (*self.inner.block_cache.write()) = BlockCache::default(); (first_block..=*range.end()).collect::>() } else { range.collect::>() @@ -556,7 +556,7 @@ impl Service { }; // Download the range of blocks and sequentially import them into the cache. // Last processed block in deposit cache - let latest_in_cache = service + let latest_in_cache = self .inner .deposit_cache .read() @@ -576,7 +576,7 @@ impl Service { |mut block_numbers| async { match block_numbers.next() { Some(block_number) => { - match download_eth1_block(service.inner.clone(), block_number).await { + match download_eth1_block(self.inner.clone(), block_number).await { Ok(eth1_block) => Ok(Some((eth1_block, block_numbers))), Err(e) => Err(e), } @@ -590,8 +590,7 @@ impl Service { let mut blocks_imported = 0; for eth1_block in eth1_blocks { - service - .inner + self.inner .block_cache .write() .insert_root_or_child(eth1_block) @@ -599,12 +598,11 @@ impl Service { metrics::set_gauge( &metrics::BLOCK_CACHE_LEN, - service.inner.block_cache.read().len() as i64, + self.inner.block_cache.read().len() as i64, ); metrics::set_gauge( &metrics::LATEST_CACHED_BLOCK_TIMESTAMP, - service - .inner + self.inner .block_cache .read() .latest_block_timestamp() @@ -615,14 +613,14 @@ impl Service { } // Prune the block cache, preventing it from growing too large. - service.inner.prune_blocks(); + self.inner.prune_blocks(); metrics::set_gauge( &metrics::BLOCK_CACHE_LEN, - service.inner.block_cache.read().len() as i64, + self.inner.block_cache.read().len() as i64, ); - let block_cache = service.inner.block_cache.read(); + let block_cache = self.inner.block_cache.read(); let latest_block_mins = block_cache .latest_block_timestamp() .and_then(|timestamp| { @@ -636,7 +634,7 @@ impl Service { if blocks_imported > 0 { debug!( - service.log, + self.log, "Imported eth1 block(s)"; "latest_block_age" => latest_block_mins, "latest_block" => block_cache.highest_block_number(), @@ -645,7 +643,7 @@ impl Service { ); } else { debug!( - service.log, + self.log, "No new eth1 blocks imported"; "latest_block" => block_cache.highest_block_number(), "cached_blocks" => block_cache.len(), @@ -654,7 +652,7 @@ impl Service { Ok(BlockCacheUpdateOutcome { blocks_imported, - head_block_number: service.inner.block_cache.read().highest_block_number(), + head_block_number: self.inner.block_cache.read().highest_block_number(), }) } } diff --git a/beacon_node/eth1/tests/test.rs b/beacon_node/eth1/tests/test.rs index fbaf6b9498..1b75ebbac6 100644 --- a/beacon_node/eth1/tests/test.rs +++ b/beacon_node/eth1/tests/test.rs @@ -145,14 +145,17 @@ mod eth1_cache { eth1.ganache.evm_mine().await.expect("should mine block"); } - Service::update_deposit_cache(service.clone()) + service + .update_deposit_cache() .await .expect("should update deposit cache"); - Service::update_block_cache(service.clone()) + service + .update_block_cache() .await .expect("should update block cache"); - Service::update_block_cache(service.clone()) + service + .update_block_cache() .await .expect("should update cache when nothing has changed"); @@ -205,10 +208,12 @@ mod eth1_cache { eth1.ganache.evm_mine().await.expect("should mine block") } - Service::update_deposit_cache(service.clone()) + service + .update_deposit_cache() .await .expect("should update deposit cache"); - Service::update_block_cache(service.clone()) + service + .update_block_cache() .await .expect("should update block cache"); @@ -250,10 +255,12 @@ mod eth1_cache { for _ in 0..cache_len / 2 { eth1.ganache.evm_mine().await.expect("should mine block") } - Service::update_deposit_cache(service.clone()) + service + .update_deposit_cache() .await .expect("should update deposit cache"); - Service::update_block_cache(service.clone()) + service + .update_block_cache() .await .expect("should update block cache"); } @@ -293,15 +300,12 @@ mod eth1_cache { eth1.ganache.evm_mine().await.expect("should mine block") } futures::try_join!( - Service::update_deposit_cache(service.clone()), - Service::update_deposit_cache(service.clone()) + service.update_deposit_cache(), + service.update_deposit_cache() ) .expect("should perform two simultaneous updates of deposit cache"); - futures::try_join!( - Service::update_block_cache(service.clone()), - Service::update_block_cache(service.clone()) - ) - .expect("should perform two simultaneous updates of block cache"); + futures::try_join!(service.update_block_cache(), service.update_block_cache()) + .expect("should perform two simultaneous updates of block cache"); assert!(service.block_cache_len() >= n, "should grow the cache"); } @@ -346,11 +350,13 @@ mod deposit_tree { .expect("should perform a deposit"); } - Service::update_deposit_cache(service.clone()) + service + .update_deposit_cache() .await .expect("should perform update"); - Service::update_deposit_cache(service.clone()) + service + .update_deposit_cache() .await .expect("should perform update when nothing has changed"); @@ -420,8 +426,8 @@ mod deposit_tree { } futures::try_join!( - Service::update_deposit_cache(service.clone()), - Service::update_deposit_cache(service.clone()) + service.update_deposit_cache(), + service.update_deposit_cache() ) .expect("should perform two updates concurrently"); @@ -661,7 +667,8 @@ mod fast { eth1.ganache.evm_mine().await.expect("should mine block"); } - Service::update_deposit_cache(service.clone()) + service + .update_deposit_cache() .await .expect("should perform update"); @@ -728,7 +735,8 @@ mod persist { .expect("should perform a deposit"); } - Service::update_deposit_cache(service.clone()) + service + .update_deposit_cache() .await .expect("should perform update"); @@ -739,7 +747,8 @@ mod persist { let deposit_count = service.deposit_cache_len(); - Service::update_block_cache(service.clone()) + service + .update_block_cache() .await .expect("should perform update"); diff --git a/beacon_node/genesis/src/eth1_genesis_service.rs b/beacon_node/genesis/src/eth1_genesis_service.rs index 5c939e69aa..540c96be7f 100644 --- a/beacon_node/genesis/src/eth1_genesis_service.rs +++ b/beacon_node/genesis/src/eth1_genesis_service.rs @@ -113,7 +113,8 @@ impl Eth1GenesisService { ); loop { - let update_result = Eth1Service::update_deposit_cache(eth1_service.clone()) + let update_result = eth1_service + .update_deposit_cache() .await .map_err(|e| format!("{:?}", e)); @@ -154,8 +155,7 @@ impl Eth1GenesisService { } // Download new eth1 blocks into the cache. - let blocks_imported = match Eth1Service::update_block_cache(eth1_service.clone()).await - { + let blocks_imported = match eth1_service.update_block_cache().await { Ok(outcome) => { debug!( log,