Builder update

This commit is contained in:
Age Manning
2020-05-08 15:06:52 +10:00
parent 167530e3f4
commit c4d5af81df
6 changed files with 171 additions and 230 deletions

View File

@@ -285,8 +285,8 @@ impl<T: EthSpec, S: Store<T>> CachingEth1Backend<T, S> {
} }
/// Starts the routine which connects to the external eth1 node and updates the caches. /// Starts the routine which connects to the external eth1 node and updates the caches.
pub async fn start(&self, exit: tokio::sync::oneshot::Receiver<()>) -> Result<(), ()> { pub async fn start(&self, exit: tokio::sync::oneshot::Receiver<()>) {
self.core.auto_update(exit).await tokio::spawn(async move { self.core.auto_update(exit).await });
} }
/// Instantiates `self` from an existing service. /// Instantiates `self` from an existing service.

View File

@@ -13,7 +13,6 @@ use environment::RuntimeContext;
use eth1::{Config as Eth1Config, Service as Eth1Service}; use eth1::{Config as Eth1Config, Service as Eth1Service};
use eth2_config::Eth2Config; use eth2_config::Eth2Config;
use eth2_libp2p::NetworkGlobals; use eth2_libp2p::NetworkGlobals;
use futures::{future, Future, IntoFuture};
use genesis::{interop_genesis_state, Eth1GenesisService}; use genesis::{interop_genesis_state, Eth1GenesisService};
use network::{NetworkConfig, NetworkMessage, NetworkService}; use network::{NetworkConfig, NetworkMessage, NetworkService};
use slog::info; use slog::info;
@@ -109,45 +108,33 @@ where
/// Initializes the `BeaconChainBuilder`. The `build_beacon_chain` method will need to be /// Initializes the `BeaconChainBuilder`. The `build_beacon_chain` method will need to be
/// called later in order to actually instantiate the `BeaconChain`. /// called later in order to actually instantiate the `BeaconChain`.
pub fn beacon_chain_builder( pub async fn beacon_chain_builder(
mut self, mut self,
client_genesis: ClientGenesis, client_genesis: ClientGenesis,
config: ClientConfig, config: ClientConfig,
) -> impl Future<Item = Self, Error = String> { ) -> Result<Self, String> {
let store = self.store.clone(); let store = self
let store_migrator = self.store_migrator.take(); .store
let chain_spec = self.chain_spec.clone();
let runtime_context = self.runtime_context.clone();
let eth_spec_instance = self.eth_spec_instance.clone();
let data_dir = config.data_dir.clone();
let disabled_forks = config.disabled_forks.clone();
future::ok(())
.and_then(move |()| {
let store = store
.ok_or_else(|| "beacon_chain_start_method requires a store".to_string())?; .ok_or_else(|| "beacon_chain_start_method requires a store".to_string())?;
let store_migrator = store_migrator.ok_or_else(|| { let store_migrator = self
"beacon_chain_start_method requires a store migrator".to_string() .store_migrator
})?; .ok_or_else(|| "beacon_chain_start_method requires a store migrator".to_string())?;
let context = runtime_context let context = self
.ok_or_else(|| { .runtime_context
"beacon_chain_start_method requires a runtime context".to_string() .ok_or_else(|| "beacon_chain_start_method requires a runtime context".to_string())?
})?
.service_context("beacon".into()); .service_context("beacon".into());
let spec = chain_spec let spec = self
.chain_spec
.ok_or_else(|| "beacon_chain_start_method requires a chain spec".to_string())?; .ok_or_else(|| "beacon_chain_start_method requires a chain spec".to_string())?;
let builder = BeaconChainBuilder::new(eth_spec_instance) let builder = BeaconChainBuilder::new(self.eth_spec_instance)
.logger(context.log.clone()) .logger(context.log.clone())
.store(store) .store(store)
.store_migrator(store_migrator) .store_migrator(store_migrator)
.data_dir(data_dir) .data_dir(config.data_dir)
.custom_spec(spec.clone()) .custom_spec(spec.clone())
.disabled_forks(disabled_forks); .disabled_forks(config.disabled_forks);
Ok((builder, spec, context))
})
.and_then(move |(builder, spec, context)| {
let chain_exists = builder let chain_exists = builder
.store_contains_beacon_chain() .store_contains_beacon_chain()
.unwrap_or_else(|_| false); .unwrap_or_else(|_| false);
@@ -158,8 +145,7 @@ where
// //
// Alternatively, if there's a beacon chain in the database then always resume // Alternatively, if there's a beacon chain in the database then always resume
// using it. // using it.
let client_genesis = if client_genesis == ClientGenesis::FromStore && !chain_exists let client_genesis = if client_genesis == ClientGenesis::FromStore && !chain_exists {
{
info!(context.log, "Defaulting to deposit contract genesis"); info!(context.log, "Defaulting to deposit contract genesis");
ClientGenesis::DepositContract ClientGenesis::DepositContract
@@ -169,21 +155,14 @@ where
client_genesis client_genesis
}; };
let genesis_state_future: Box<dyn Future<Item = _, Error = _> + Send> = let (beacon_chain_builder, eth1_service_option) = match client_genesis {
match client_genesis {
ClientGenesis::Interop { ClientGenesis::Interop {
validator_count, validator_count,
genesis_time, genesis_time,
} => { } => {
let keypairs = generate_deterministic_keypairs(validator_count); let keypairs = generate_deterministic_keypairs(validator_count);
let result = interop_genesis_state(&keypairs, genesis_time, &spec); let genesis_state = interop_genesis_state(&keypairs, genesis_time, &spec)?;
builder.genesis_state(genesis_state).map(|v| (v, None))?
let future = result
.and_then(move |genesis_state| builder.genesis_state(genesis_state))
.into_future()
.map(|v| (v, None));
Box::new(future)
} }
ClientGenesis::SszBytes { ClientGenesis::SszBytes {
genesis_state_bytes, genesis_state_bytes,
@@ -193,15 +172,10 @@ where
"Starting from known genesis state"; "Starting from known genesis state";
); );
let result = BeaconState::from_ssz_bytes(&genesis_state_bytes) let genesis_state = BeaconState::from_ssz_bytes(&genesis_state_bytes)
.map_err(|e| format!("Unable to parse genesis state SSZ: {:?}", e)); .map_err(|e| format!("Unable to parse genesis state SSZ: {:?}", e))?;
let future = result builder.genesis_state(genesis_state).map(|v| (v, None))?
.and_then(move |genesis_state| builder.genesis_state(genesis_state))
.into_future()
.map(|v| (v, None));
Box::new(future)
} }
ClientGenesis::DepositContract => { ClientGenesis::DepositContract => {
info!( info!(
@@ -212,33 +186,25 @@ where
"deposit_contract" => &config.eth1.deposit_contract_address "deposit_contract" => &config.eth1.deposit_contract_address
); );
let genesis_service = let genesis_service = Eth1GenesisService::new(config.eth1, context.log.clone());
Eth1GenesisService::new(config.eth1, context.log.clone());
let future = genesis_service let genesis_state = genesis_service
.wait_for_genesis_state( .wait_for_genesis_state(
Duration::from_millis(ETH1_GENESIS_UPDATE_INTERVAL_MILLIS), Duration::from_millis(ETH1_GENESIS_UPDATE_INTERVAL_MILLIS),
context.eth2_config().spec.clone(), context.eth2_config().spec.clone(),
) )
.and_then(move |genesis_state| builder.genesis_state(genesis_state)) .await?;
.map(|v| (v, Some(genesis_service.into_core_service())));
Box::new(future) builder
} .genesis_state(genesis_state)
ClientGenesis::FromStore => { .map(|v| (v, Some(genesis_service.into_core_service())))?
let future = builder.resume_from_db().into_future().map(|v| (v, None));
Box::new(future)
} }
ClientGenesis::FromStore => builder.resume_from_db().map(|v| (v, None))?,
}; };
genesis_state_future
})
.map(move |(beacon_chain_builder, eth1_service_option)| {
self.eth1_service = eth1_service_option; self.eth1_service = eth1_service_option;
self.beacon_chain_builder = Some(beacon_chain_builder); self.beacon_chain_builder = Some(beacon_chain_builder);
self Ok(self)
})
} }
/// Immediately starts the networking stack. /// Immediately starts the networking stack.
@@ -254,7 +220,7 @@ where
.service_context("network".into()); .service_context("network".into());
let (network_globals, network_send, network_exit) = let (network_globals, network_send, network_exit) =
NetworkService::start(beacon_chain, config, &context.executor, context.log) NetworkService::start(beacon_chain, config, &context.runtime_handle, context.log)
.map_err(|e| format!("Failed to start network: {:?}", e))?; .map_err(|e| format!("Failed to start network: {:?}", e))?;
self.network_globals = Some(network_globals); self.network_globals = Some(network_globals);
@@ -281,12 +247,9 @@ where
.ok_or_else(|| "node timer requires a chain spec".to_string())? .ok_or_else(|| "node timer requires a chain spec".to_string())?
.milliseconds_per_slot; .milliseconds_per_slot;
let timer_exit = timer::spawn( let timer_exit = context
&context.executor, .runtime_handle
beacon_chain, .enter(|| timer::spawn(beacon_chain, milliseconds_per_slot))
milliseconds_per_slot,
context.log,
)
.map_err(|e| format!("Unable to start node timer: {}", e))?; .map_err(|e| format!("Unable to start node timer: {}", e))?;
self.exit_channels.push(timer_exit); self.exit_channels.push(timer_exit);
@@ -323,9 +286,9 @@ where
network_chan: network_send, network_chan: network_send,
}; };
let (exit_channel, listening_addr) = rest_api::start_server( let (exit_channel, listening_addr) = context.runtime_handle.enter(|| {
rest_api::start_server(
&client_config.rest_api, &client_config.rest_api,
&context.executor,
beacon_chain, beacon_chain,
network_info, network_info,
client_config client_config
@@ -337,7 +300,8 @@ where
eth2_config.clone(), eth2_config.clone(),
context.log, context.log,
) )
.map_err(|e| format!("Failed to start HTTP API: {:?}", e))?; .map_err(|e| format!("Failed to start HTTP API: {:?}", e))
})?;
self.exit_channels.push(exit_channel); self.exit_channels.push(exit_channel);
self.http_listen_addr = Some(listening_addr); self.http_listen_addr = Some(listening_addr);
@@ -472,8 +436,9 @@ where
Option<_>, Option<_>,
Option<_>, Option<_>,
) = if config.enabled { ) = if config.enabled {
let (sender, exit, listening_addr) = let (sender, exit, listening_addr) = context
websocket_server::start_server(&config, &context.executor, &context.log)?; .runtime_handle
.enter(|| websocket_server::start_server(&config, &context.log))?;
(sender, Some(exit), Some(listening_addr)) (sender, Some(exit), Some(listening_addr))
} else { } else {
(WebSocketSender::dummy(), None, None) (WebSocketSender::dummy(), None, None)
@@ -692,7 +657,7 @@ where
}; };
// Starts the service that connects to an eth1 node and periodically updates caches. // Starts the service that connects to an eth1 node and periodically updates caches.
context.executor.spawn(backend.start(exit)); context.runtime_handle.spawn(backend.start(exit));
self.beacon_chain_builder = Some(beacon_chain_builder.eth1_backend(Some(backend))); self.beacon_chain_builder = Some(beacon_chain_builder.eth1_backend(Some(backend)));

View File

@@ -246,54 +246,39 @@ impl Service {
pub async fn update( pub async fn update(
&self, &self,
) -> Result<(DepositCacheUpdateOutcome, BlockCacheUpdateOutcome), String> { ) -> Result<(DepositCacheUpdateOutcome, BlockCacheUpdateOutcome), String> {
let deposit_future = self let update_deposit_cache = async {
let outcome = self
.update_deposit_cache() .update_deposit_cache()
.map_err(|e| format!("Failed to update eth1 cache: {:?}", e)) .await
.then(|result| async move{ .map_err(|e| format!("Failed to update eth1 cache: {:?}", e))?;
match &result {
Ok(DepositCacheUpdateOutcome { logs_imported }) => trace!( trace!(
self.log, self.log,
"Updated eth1 deposit cache"; "Updated eth1 deposit cache";
"cached_deposits" => self.inner.deposit_cache.read().cache.len(), "cached_deposits" => self.inner.deposit_cache.read().cache.len(),
"logs_imported" => logs_imported, "logs_imported" => outcome.logs_imported,
"last_processed_eth1_block" => self.inner.deposit_cache.read().last_processed_block, "last_processed_eth1_block" => self.inner.deposit_cache.read().last_processed_block,
), );
Err(e) => error!( Ok(outcome)
self.log,
"Failed to update eth1 deposit cache";
"error" => e
),
}; };
result let update_block_cache = async {
}); let outcome = self
let block_future = self
.update_block_cache() .update_block_cache()
.map_err(|e| format!("Failed to update eth1 cache: {:?}", e)) .await
.then(|result| async move { .map_err(|e| format!("Failed to update eth1 cache: {:?}", e))?;
match &result {
Ok(BlockCacheUpdateOutcome { trace!(
blocks_imported,
head_block_number,
}) => trace!(
self.log, self.log,
"Updated eth1 block cache"; "Updated eth1 block cache";
"cached_blocks" => self.inner.block_cache.read().len(), "cached_blocks" => self.inner.block_cache.read().len(),
"blocks_imported" => blocks_imported, "blocks_imported" => outcome.blocks_imported,
"head_block" => head_block_number, "head_block" => outcome.head_block_number,
), );
Err(e) => error!( Ok(outcome)
self.log,
"Failed to update eth1 block cache";
"error" => e
),
}; };
result futures::try_join!(update_deposit_cache, update_block_cache)
});
futures::try_join!(deposit_future, block_future)
} }
/// A looping future that updates the cache, then waits `config.auto_update_interval` before /// A looping future that updates the cache, then waits `config.auto_update_interval` before
@@ -305,13 +290,12 @@ impl Service {
/// - Err(_) if there is an error. /// - Err(_) if there is an error.
/// ///
/// Emits logs for debugging and errors. /// Emits logs for debugging and errors.
pub async fn auto_update( pub async fn auto_update(&self, exit: tokio::sync::oneshot::Receiver<()>) {
&self,
mut exit: tokio::sync::oneshot::Receiver<()>,
) -> Result<(), ()> {
let update_interval = Duration::from_millis(self.config().auto_update_interval_millis); let update_interval = Duration::from_millis(self.config().auto_update_interval_millis);
loop { loop {
let update_future = async move {
/*
let update_result = self.update().await; let update_result = self.update().await;
match update_result { match update_result {
@@ -329,16 +313,17 @@ impl Service {
"deposits" => format!("{:?}", deposit), "deposits" => format!("{:?}", deposit),
), ),
}; };
*/
// WARNING: delay_for doesn't return an error and panics on error. // WARNING: delay_for doesn't return an error and panics on error.
delay_for(update_interval).await; delay_for(update_interval).await;
match exit.try_recv() { };
Ok(_) | Err(TryRecvError::Closed) => break, if let futures::future::Either::Right(_) =
Err(TryRecvError::Empty) => {} futures::future::select(Box::pin(update_future), futures::future::ready(())).await
{
// the exit future returned end
break;
} }
} }
Ok(())
} }
/// Contacts the remote eth1 node and attempts to import deposit logs up to the configured /// Contacts the remote eth1 node and attempts to import deposit logs up to the configured

View File

@@ -35,7 +35,6 @@ use std::net::SocketAddr;
use std::ops::Deref; use std::ops::Deref;
use std::path::PathBuf; use std::path::PathBuf;
use std::sync::Arc; use std::sync::Arc;
use tokio::runtime::Handle;
use tokio::sync::{mpsc, oneshot}; use tokio::sync::{mpsc, oneshot};
use url_query::UrlQuery; use url_query::UrlQuery;
@@ -53,7 +52,6 @@ pub struct NetworkInfo<T: BeaconChainTypes> {
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
pub fn start_server<T: BeaconChainTypes>( pub fn start_server<T: BeaconChainTypes>(
config: &Config, config: &Config,
handle: &Handle,
beacon_chain: Arc<BeaconChain<T>>, beacon_chain: Arc<BeaconChain<T>>,
network_info: NetworkInfo<T>, network_info: NetworkInfo<T>,
db_path: PathBuf, db_path: PathBuf,
@@ -130,7 +128,7 @@ pub fn start_server<T: BeaconChainTypes>(
"port" => actual_listen_addr.port(), "port" => actual_listen_addr.port(),
); );
handle.spawn(server_future); tokio::spawn(server_future);
Ok((exit_signal, actual_listen_addr)) Ok((exit_signal, actual_listen_addr))
} }

View File

@@ -3,12 +3,11 @@
//! This service allows task execution on the beacon node for various functionality. //! This service allows task execution on the beacon node for various functionality.
use beacon_chain::{BeaconChain, BeaconChainTypes}; use beacon_chain::{BeaconChain, BeaconChainTypes};
use futures::future;
use futures::stream::StreamExt; use futures::stream::StreamExt;
use futures::{future, prelude::*};
use slot_clock::SlotClock; use slot_clock::SlotClock;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use tokio::runtime::Handle;
use tokio::time::{interval_at, Instant}; use tokio::time::{interval_at, Instant};
/// Spawns a timer service which periodically executes tasks for the beacon chain /// Spawns a timer service which periodically executes tasks for the beacon chain
@@ -16,7 +15,6 @@ use tokio::time::{interval_at, Instant};
/// called from the context of a runtime and we can simply spawn using task::spawn. /// called from the context of a runtime and we can simply spawn using task::spawn.
/// Check for issues without the Handle. /// Check for issues without the Handle.
pub fn spawn<T: BeaconChainTypes>( pub fn spawn<T: BeaconChainTypes>(
handle: &Handle,
beacon_chain: Arc<BeaconChain<T>>, beacon_chain: Arc<BeaconChain<T>>,
milliseconds_per_slot: u64, milliseconds_per_slot: u64,
) -> Result<tokio::sync::oneshot::Sender<()>, &'static str> { ) -> Result<tokio::sync::oneshot::Sender<()>, &'static str> {
@@ -35,8 +33,8 @@ pub fn spawn<T: BeaconChainTypes>(
future::ready(()) future::ready(())
}); });
let future = futures::future::select(timer_future, exit.map_err(|_| ()).map(|_| ())); let future = futures::future::select(timer_future, exit);
handle.spawn(future); tokio::spawn(future);
Ok(exit_signal) Ok(exit_signal)
} }

View File

@@ -2,7 +2,6 @@ use futures::future::TryFutureExt;
use slog::{debug, error, info, warn, Logger}; use slog::{debug, error, info, warn, Logger};
use std::marker::PhantomData; use std::marker::PhantomData;
use std::net::SocketAddr; use std::net::SocketAddr;
use tokio::runtime::Handle;
use types::EthSpec; use types::EthSpec;
use ws::{Sender, WebSocket}; use ws::{Sender, WebSocket};
@@ -37,7 +36,6 @@ impl<T: EthSpec> WebSocketSender<T> {
pub fn start_server<T: EthSpec>( pub fn start_server<T: EthSpec>(
config: &Config, config: &Config,
handle: &Handle,
log: &Logger, log: &Logger,
) -> Result< ) -> Result<
( (
@@ -92,15 +90,12 @@ pub fn start_server<T: EthSpec>(
// Place a future on the handle that will shutdown the websocket server when the // Place a future on the handle that will shutdown the websocket server when the
// application exits. // application exits.
// TODO: check if we should spawn using a `Handle` or using `task::spawn` tokio::spawn(exit_future);
handle.spawn(exit_future);
exit_channel exit_channel
}; };
let log_inner = log.clone(); let log_inner = log.clone();
// TODO: using tokio `spawn_blocking` instead of `thread::spawn`
// Check which is more apt.
let _handle = tokio::task::spawn_blocking(move || match server.run() { let _handle = tokio::task::spawn_blocking(move || match server.run() {
Ok(_) => { Ok(_) => {
debug!( debug!(