Port genesis service to stable futures

This commit is contained in:
pawan
2020-03-01 01:51:34 +05:30
committed by Age Manning
parent bdf89513c6
commit a80f40f155
2 changed files with 76 additions and 114 deletions

View File

@@ -6,10 +6,9 @@ edition = "2018"
[dev-dependencies] [dev-dependencies]
eth1_test_rig = { path = "../../tests/eth1_test_rig" } eth1_test_rig = { path = "../../tests/eth1_test_rig" }
futures = "0.1.25"
[dependencies] [dependencies]
futures = "0.1.25" futures = "0.3"
types = { path = "../../eth2/types"} types = { path = "../../eth2/types"}
environment = { path = "../../lighthouse/environment"} environment = { path = "../../lighthouse/environment"}
eth1 = { path = "../eth1"} eth1 = { path = "../eth1"}
@@ -19,10 +18,10 @@ merkle_proof = { path = "../../eth2/utils/merkle_proof" }
eth2_ssz = "0.1" eth2_ssz = "0.1"
eth2_hashing = { path = "../../eth2/utils/eth2_hashing" } eth2_hashing = { path = "../../eth2/utils/eth2_hashing" }
tree_hash = "0.1" tree_hash = "0.1"
tokio = "0.1.22" tokio = {version = "0.2", features = ["full"]}
parking_lot = "0.7" parking_lot = "0.7"
slog = "^2.2.3" slog = "^2.2.3"
exit-future = "0.1.4" exit-future = "0.2"
serde = "1.0" serde = "1.0"
serde_derive = "1.0" serde_derive = "1.0"
int_to_bytes = { path = "../../eth2/utils/int_to_bytes" } int_to_bytes = { path = "../../eth2/utils/int_to_bytes" }

View File

@@ -2,11 +2,6 @@ pub use crate::{common::genesis_deposits, interop::interop_genesis_state};
pub use eth1::Config as Eth1Config; pub use eth1::Config as Eth1Config;
use eth1::{DepositLog, Eth1Block, Service}; use eth1::{DepositLog, Eth1Block, Service};
use futures::{
future,
future::{loop_fn, Loop},
Future,
};
use parking_lot::Mutex; use parking_lot::Mutex;
use slog::{debug, error, info, trace, Logger}; use slog::{debug, error, info, trace, Logger};
use state_processing::{ use state_processing::{
@@ -14,8 +9,8 @@ use state_processing::{
per_block_processing::process_deposit, process_activations, per_block_processing::process_deposit, process_activations,
}; };
use std::sync::Arc; use std::sync::Arc;
use std::time::{Duration, Instant}; use std::time::Duration;
use tokio::timer::Delay; use tokio::time::delay_for;
use types::{BeaconState, ChainSpec, Deposit, Eth1Data, EthSpec, Hash256}; use types::{BeaconState, ChainSpec, Deposit, Eth1Data, EthSpec, Hash256};
/// Provides a service that connects to some Eth1 HTTP JSON-RPC endpoint and maintains a cache of eth1 /// Provides a service that connects to some Eth1 HTTP JSON-RPC endpoint and maintains a cache of eth1
@@ -87,33 +82,24 @@ impl Eth1GenesisService {
/// ///
/// - `Ok(state)` once the canonical eth2 genesis state has been discovered. /// - `Ok(state)` once the canonical eth2 genesis state has been discovered.
/// - `Err(e)` if there is some internal error during updates. /// - `Err(e)` if there is some internal error during updates.
pub fn wait_for_genesis_state<E: EthSpec>( pub async fn wait_for_genesis_state<E: EthSpec>(
&self, &self,
update_interval: Duration, update_interval: Duration,
spec: ChainSpec, spec: ChainSpec,
) -> impl Future<Item = BeaconState<E>, Error = String> { ) -> Result<BeaconState<E>, String> {
let service = self.clone(); let service = self.clone();
loop_fn::<(ChainSpec, Option<BeaconState<E>>), _, _, _>(
(spec, None),
move |(spec, state)| {
let service_1 = service.clone();
let service_2 = service.clone();
let service_3 = service.clone();
let service_4 = service.clone();
let log = service.core.log.clone(); let log = service.core.log.clone();
let min_genesis_active_validator_count = spec.min_genesis_active_validator_count; let min_genesis_active_validator_count = spec.min_genesis_active_validator_count;
let min_genesis_time = spec.min_genesis_time; let min_genesis_time = spec.min_genesis_time;
loop {
Delay::new(Instant::now() + update_interval) // **WARNING** `delay_for` panics on error
.map_err(|e| format!("Delay between genesis deposit checks failed: {:?}", e)) delay_for(update_interval).await;
.and_then(move |()| { let update_result = self
service_1
.core .core
.update_deposit_cache() .update_deposit_cache()
.map_err(|e| format!("{:?}", e)) .await
}) .map_err(|e| format!("{:?}", e));
.then(move |update_result| {
if let Err(e) = update_result { if let Err(e) = update_result {
error!( error!(
log, log,
@@ -123,81 +109,58 @@ impl Eth1GenesisService {
} }
// Do not exit the loop if there is an error whilst updating. // Do not exit the loop if there is an error whilst updating.
Ok(())
})
// Only enable the `sync_blocks` flag if there are enough deposits to feasibly // Only enable the `sync_blocks` flag if there are enough deposits to feasibly
// trigger genesis. // trigger genesis.
// //
// Note: genesis is triggered by the _active_ validator count, not just the // Note: genesis is triggered by the _active_ validator count, not just the
// deposit count, so it's possible that block downloads are started too early. // deposit count, so it's possible that block downloads are started too early.
// This is just wasteful, not erroneous. // This is just wasteful, not erroneous.
.and_then(move |()| { let mut sync_blocks = self.sync_blocks.lock();
let mut sync_blocks = service_2.sync_blocks.lock();
if !(*sync_blocks) { if !(*sync_blocks) {
if let Some(viable_eth1_block) = service_2.first_viable_eth1_block( if let Some(viable_eth1_block) =
min_genesis_active_validator_count as usize, self.first_viable_eth1_block(min_genesis_active_validator_count as usize)
) { {
info!( info!(
service_2.core.log, log,
"Minimum genesis deposit count met"; "Minimum genesis deposit count met";
"deposit_count" => min_genesis_active_validator_count, "deposit_count" => min_genesis_active_validator_count,
"block_number" => viable_eth1_block, "block_number" => viable_eth1_block,
); );
service_2.core.set_lowest_cached_block(viable_eth1_block); self.core.set_lowest_cached_block(viable_eth1_block);
*sync_blocks = true *sync_blocks = true
} }
} }
Ok(*sync_blocks) let should_update_block_cache = *sync_blocks;
})
.and_then(move |should_update_block_cache| {
let maybe_update_future: Box<dyn Future<Item = _, Error = _> + Send> =
if should_update_block_cache { if should_update_block_cache {
Box::new(service_3.core.update_block_cache().then( let update_result = self.core.update_block_cache().await;
move |update_result| {
if let Err(e) = update_result { if let Err(e) = update_result {
error!( error!(
service_3.core.log, log,
"Failed to update eth1 block cache"; "Failed to update eth1 block cache";
"error" => format!("{:?}", e) "error" => format!("{:?}", e)
); );
} }
// Do not exit the loop if there is an error whilst updating.
Ok(())
},
))
} else {
Box::new(future::ok(()))
}; };
if let Some(genesis_state) = self
maybe_update_future
})
.and_then(move |()| {
if let Some(genesis_state) = service_4
.scan_new_blocks::<E>(&spec) .scan_new_blocks::<E>(&spec)
.map_err(|e| format!("Failed to scan for new blocks: {}", e))? .map_err(|e| format!("Failed to scan for new blocks: {}", e))?
{ {
Ok(Loop::Break((spec, genesis_state))) break Ok(genesis_state);
} else { } else {
debug!( debug!(
service_4.core.log, log,
"No eth1 genesis block found"; "No eth1 genesis block found";
"latest_block_timestamp" => service_4.core.latest_block_timestamp(), "latest_block_timestamp" => self.core.latest_block_timestamp(),
"min_genesis_time" => min_genesis_time, "min_genesis_time" => min_genesis_time,
"min_validator_count" => min_genesis_active_validator_count, "min_validator_count" => min_genesis_active_validator_count,
"cached_blocks" => service_4.core.block_cache_len(), "cached_blocks" => self.core.block_cache_len(),
"cached_deposits" => service_4.core.deposit_cache_len(), "cached_deposits" => self.core.deposit_cache_len(),
"cache_head" => service_4.highest_known_block(), "cache_head" => self.highest_known_block(),
); );
Ok(Loop::Continue((spec, state)))
} }
}) }
},
)
.map(|(_spec, state)| state)
} }
/// Processes any new blocks that have appeared since this function was last run. /// Processes any new blocks that have appeared since this function was last run.