Rename runtime_handle to executor

This commit is contained in:
pawan
2020-05-20 13:00:39 +05:30
parent 6d3503c05f
commit 8be6a4ecd5
13 changed files with 51 additions and 57 deletions

View File

@@ -222,7 +222,7 @@ where
.clone(); .clone();
let (network_globals, network_send) = let (network_globals, network_send) =
NetworkService::start(beacon_chain, config, context.runtime_handle, context.log) NetworkService::start(beacon_chain, config, context.executor, context.log)
.map_err(|e| format!("Failed to start network: {:?}", e))?; .map_err(|e| format!("Failed to start network: {:?}", e))?;
self.network_globals = Some(network_globals); self.network_globals = Some(network_globals);
@@ -249,7 +249,7 @@ where
.milliseconds_per_slot; .milliseconds_per_slot;
let _ = timer::spawn( let _ = timer::spawn(
context.runtime_handle, context.executor,
beacon_chain, beacon_chain,
milliseconds_per_slot, milliseconds_per_slot,
context.log.clone(), context.log.clone(),
@@ -290,7 +290,7 @@ where
let log = context.log.clone(); let log = context.log.clone();
let listening_addr = rest_api::start_server( let listening_addr = rest_api::start_server(
context.runtime_handle, context.executor,
&client_config.rest_api, &client_config.rest_api,
beacon_chain, beacon_chain,
network_info, network_info,
@@ -332,7 +332,7 @@ where
.milliseconds_per_slot; .milliseconds_per_slot;
let _ = spawn_notifier( let _ = spawn_notifier(
context.runtime_handle, context.executor,
beacon_chain, beacon_chain,
network_globals, network_globals,
milliseconds_per_slot, milliseconds_per_slot,
@@ -428,7 +428,7 @@ where
let (sender, listening_addr): (WebSocketSender<TEthSpec>, Option<_>) = if config.enabled { let (sender, listening_addr): (WebSocketSender<TEthSpec>, Option<_>) = if config.enabled {
let (sender, listening_addr) = let (sender, listening_addr) =
websocket_server::start_server(context.runtime_handle, &config, &context.log)?; websocket_server::start_server(context.executor, &config, &context.log)?;
(sender, Some(listening_addr)) (sender, Some(listening_addr))
} else { } else {
(WebSocketSender::dummy(), None) (WebSocketSender::dummy(), None)
@@ -638,7 +638,7 @@ where
self.eth1_service = None; self.eth1_service = None;
// Starts the service that connects to an eth1 node and periodically updates caches. // Starts the service that connects to an eth1 node and periodically updates caches.
backend.start(context.runtime_handle); backend.start(context.executor);
self.beacon_chain_builder = Some(beacon_chain_builder.eth1_backend(Some(backend))); self.beacon_chain_builder = Some(beacon_chain_builder.eth1_backend(Some(backend)));

View File

@@ -25,7 +25,7 @@ const SPEEDO_OBSERVATIONS: usize = 4;
/// Spawns a notifier service which periodically logs information about the node. /// Spawns a notifier service which periodically logs information about the node.
pub fn spawn_notifier<T: BeaconChainTypes>( pub fn spawn_notifier<T: BeaconChainTypes>(
handle: environment::TaskExecutor, executor: environment::TaskExecutor,
beacon_chain: Arc<BeaconChain<T>>, beacon_chain: Arc<BeaconChain<T>>,
network: Arc<NetworkGlobals<T::EthSpec>>, network: Arc<NetworkGlobals<T::EthSpec>>,
milliseconds_per_slot: u64, milliseconds_per_slot: u64,
@@ -149,7 +149,7 @@ pub fn spawn_notifier<T: BeaconChainTypes>(
}; };
// run the notifier on the current executor // run the notifier on the current executor
handle.spawn(interval_future.unwrap_or_else(|_| ()), "beacon_notifier"); executor.spawn(interval_future.unwrap_or_else(|_| ()), "beacon_notifier");
Ok(()) Ok(())
} }

View File

@@ -53,7 +53,7 @@ impl<T: BeaconChainTypes> NetworkService<T> {
pub fn start( pub fn start(
beacon_chain: Arc<BeaconChain<T>>, beacon_chain: Arc<BeaconChain<T>>,
config: &NetworkConfig, config: &NetworkConfig,
handle: environment::TaskExecutor, executor: environment::TaskExecutor,
network_log: slog::Logger, network_log: slog::Logger,
) -> error::Result<( ) -> error::Result<(
Arc<NetworkGlobals<T::EthSpec>>, Arc<NetworkGlobals<T::EthSpec>>,
@@ -74,7 +74,7 @@ impl<T: BeaconChainTypes> NetworkService<T> {
// launch libp2p service // launch libp2p service
let (network_globals, mut libp2p) = let (network_globals, mut libp2p) =
LibP2PService::new(handle.runtime_handle(), config, enr_fork_id, &network_log)?; LibP2PService::new(executor.runtime_handle(), config, enr_fork_id, &network_log)?;
for enr in load_dht::<T::Store, T::EthSpec>(store.clone()) { for enr in load_dht::<T::Store, T::EthSpec>(store.clone()) {
libp2p.swarm.add_enr(enr); libp2p.swarm.add_enr(enr);
@@ -87,7 +87,7 @@ impl<T: BeaconChainTypes> NetworkService<T> {
beacon_chain.clone(), beacon_chain.clone(),
network_globals.clone(), network_globals.clone(),
network_send.clone(), network_send.clone(),
handle.clone(), executor.clone(),
network_log.clone(), network_log.clone(),
)?; )?;
@@ -110,21 +110,20 @@ impl<T: BeaconChainTypes> NetworkService<T> {
propagation_percentage, propagation_percentage,
}; };
let _ = spawn_service(handle, network_service)?; let _ = spawn_service(executor, network_service)?;
Ok((network_globals, network_send)) Ok((network_globals, network_send))
} }
} }
fn spawn_service<T: BeaconChainTypes>( fn spawn_service<T: BeaconChainTypes>(
handle: environment::TaskExecutor, executor: environment::TaskExecutor,
mut service: NetworkService<T>, mut service: NetworkService<T>,
) -> error::Result<()> { ) -> error::Result<()> {
let mut exit_rx = handle.exit(); let mut exit_rx = executor.exit();
let handle = handle.runtime_handle();
// spawn on the current executor // spawn on the current executor
handle.spawn(async move { executor.runtime_handle().spawn(async move {
loop { loop {
// build the futures to check simultaneously // build the futures to check simultaneously
tokio::select! { tokio::select! {

View File

@@ -52,7 +52,7 @@ pub struct NetworkInfo<T: BeaconChainTypes> {
// Allowing more than 7 arguments. // Allowing more than 7 arguments.
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
pub fn start_server<T: BeaconChainTypes>( pub fn start_server<T: BeaconChainTypes>(
handle: environment::TaskExecutor, executor: environment::TaskExecutor,
config: &Config, config: &Config,
beacon_chain: Arc<BeaconChain<T>>, beacon_chain: Arc<BeaconChain<T>>,
network_info: NetworkInfo<T>, network_info: NetworkInfo<T>,
@@ -100,7 +100,7 @@ pub fn start_server<T: BeaconChainTypes>(
let actual_listen_addr = server.local_addr(); let actual_listen_addr = server.local_addr();
// Build a channel to kill the HTTP server. // Build a channel to kill the HTTP server.
let exit = handle.exit(); let exit = executor.exit();
let inner_log = log.clone(); let inner_log = log.clone();
let server_exit = async move { let server_exit = async move {
let _ = exit.await; let _ = exit.await;
@@ -127,7 +127,7 @@ pub fn start_server<T: BeaconChainTypes>(
"port" => actual_listen_addr.port(), "port" => actual_listen_addr.port(),
); );
handle.runtime_handle().spawn(server_future); executor.runtime_handle().spawn(server_future);
Ok(actual_listen_addr) Ok(actual_listen_addr)
} }

View File

@@ -13,13 +13,11 @@ use tokio::time::{interval_at, Instant};
/// Spawns a timer service which periodically executes tasks for the beacon chain /// Spawns a timer service which periodically executes tasks for the beacon chain
pub fn spawn<T: BeaconChainTypes>( pub fn spawn<T: BeaconChainTypes>(
handle: environment::TaskExecutor, executor: environment::TaskExecutor,
beacon_chain: Arc<BeaconChain<T>>, beacon_chain: Arc<BeaconChain<T>>,
milliseconds_per_slot: u64, milliseconds_per_slot: u64,
log: slog::Logger, log: slog::Logger,
) -> Result<tokio::sync::oneshot::Sender<()>, &'static str> { ) -> Result<(), &'static str> {
let (exit_signal, exit) = tokio::sync::oneshot::channel();
let start_instant = Instant::now() let start_instant = Instant::now()
+ beacon_chain + beacon_chain
.slot_clock .slot_clock
@@ -34,8 +32,8 @@ pub fn spawn<T: BeaconChainTypes>(
} }
}; };
handle.spawn(timer_future, "timer_service"); executor.spawn(timer_future, "timer_service");
info!(log, "Timer service started"); info!(log, "Timer service started");
Ok(exit_signal) Ok(())
} }

View File

@@ -34,7 +34,7 @@ impl<T: EthSpec> WebSocketSender<T> {
} }
pub fn start_server<T: EthSpec>( pub fn start_server<T: EthSpec>(
handle: environment::TaskExecutor, executor: environment::TaskExecutor,
config: &Config, config: &Config,
log: &Logger, log: &Logger,
) -> Result<(WebSocketSender<T>, SocketAddr), String> { ) -> Result<(WebSocketSender<T>, SocketAddr), String> {
@@ -61,7 +61,7 @@ pub fn start_server<T: EthSpec>(
let broadcaster = server.broadcaster(); let broadcaster = server.broadcaster();
// Produce a signal/channel that can gracefully shutdown the websocket server. // Produce a signal/channel that can gracefully shutdown the websocket server.
let exit = handle.exit(); let exit = executor.exit();
let log_inner = log.clone(); let log_inner = log.clone();
let broadcaster_inner = server.broadcaster(); let broadcaster_inner = server.broadcaster();
let exit_future = async move { let exit_future = async move {
@@ -79,7 +79,7 @@ pub fn start_server<T: EthSpec>(
// Place a future on the handle that will shutdown the websocket server when the // Place a future on the handle that will shutdown the websocket server when the
// application exits. // application exits.
handle.runtime_handle().spawn(exit_future); executor.runtime_handle().spawn(exit_future);
let log_inner = log.clone(); let log_inner = log.clone();

View File

@@ -198,7 +198,7 @@ impl<E: EthSpec> EnvironmentBuilder<E> {
/// `Runtime`, instead it only has access to a `Runtime`. /// `Runtime`, instead it only has access to a `Runtime`.
#[derive(Clone)] #[derive(Clone)]
pub struct RuntimeContext<E: EthSpec> { pub struct RuntimeContext<E: EthSpec> {
pub runtime_handle: TaskExecutor, pub executor: TaskExecutor,
// TODO: remove if TaskExecutor contains log // TODO: remove if TaskExecutor contains log
pub log: Logger, pub log: Logger,
pub eth_spec_instance: E, pub eth_spec_instance: E,
@@ -211,7 +211,7 @@ impl<E: EthSpec> RuntimeContext<E> {
/// The generated service will have the `service_name` in all it's logs. /// The generated service will have the `service_name` in all it's logs.
pub fn service_context(&self, service_name: String) -> Self { pub fn service_context(&self, service_name: String) -> Self {
Self { Self {
runtime_handle: self.runtime_handle.clone(), executor: self.executor.clone(),
log: self.log.new(o!("service" => service_name)), log: self.log.new(o!("service" => service_name)),
eth_spec_instance: self.eth_spec_instance.clone(), eth_spec_instance: self.eth_spec_instance.clone(),
eth2_config: self.eth2_config.clone(), eth2_config: self.eth2_config.clone(),
@@ -302,7 +302,7 @@ impl<E: EthSpec> Environment<E> {
/// Returns a `Context` where no "service" has been added to the logger output. /// Returns a `Context` where no "service" has been added to the logger output.
pub fn core_context(&mut self) -> RuntimeContext<E> { pub fn core_context(&mut self) -> RuntimeContext<E> {
RuntimeContext { RuntimeContext {
runtime_handle: TaskExecutor { executor: TaskExecutor {
exit: self.exit.clone(), exit: self.exit.clone(),
handle: self.runtime().handle().clone(), handle: self.runtime().handle().clone(),
log: self.log.clone(), log: self.log.clone(),
@@ -316,7 +316,7 @@ impl<E: EthSpec> Environment<E> {
/// Returns a `Context` where the `service_name` is added to the logger output. /// Returns a `Context` where the `service_name` is added to the logger output.
pub fn service_context(&mut self, service_name: String) -> RuntimeContext<E> { pub fn service_context(&mut self, service_name: String) -> RuntimeContext<E> {
RuntimeContext { RuntimeContext {
runtime_handle: TaskExecutor { executor: TaskExecutor {
exit: self.exit.clone(), exit: self.exit.clone(),
handle: self.runtime().handle().clone(), handle: self.runtime().handle().clone(),
log: self.log.new(o!("service" => service_name.clone())), log: self.log.new(o!("service" => service_name.clone())),

View File

@@ -218,7 +218,7 @@ fn run<E: EthSpec>(
environment environment
.core_context() .core_context()
.runtime_handle .executor
.runtime_handle() .runtime_handle()
.enter(|| { .enter(|| {
validator validator

View File

@@ -3,7 +3,7 @@ use crate::{
validator_store::ValidatorStore, validator_store::ValidatorStore,
}; };
use environment::RuntimeContext; use environment::RuntimeContext;
use futures::{StreamExt, TryFutureExt}; use futures::StreamExt;
use remote_beacon_node::{PublishStatus, RemoteBeaconNode}; use remote_beacon_node::{PublishStatus, RemoteBeaconNode};
use slog::{crit, debug, info, trace}; use slog::{crit, debug, info, trace};
use slot_clock::SlotClock; use slot_clock::SlotClock;
@@ -140,7 +140,7 @@ impl<T: SlotClock + 'static, E: EthSpec> AttestationService<T, E> {
) )
}; };
let runtime_handle = self.context.runtime_handle.clone(); let executor = self.context.executor.clone();
let interval_fut = async move { let interval_fut = async move {
while interval.next().await.is_some() { while interval.next().await.is_some() {
@@ -161,7 +161,7 @@ impl<T: SlotClock + 'static, E: EthSpec> AttestationService<T, E> {
} }
}; };
runtime_handle.spawn(interval_fut, "attestation_service"); executor.spawn(interval_fut, "attestation_service");
Ok(()) Ok(())
} }
@@ -206,16 +206,13 @@ impl<T: SlotClock + 'static, E: EthSpec> AttestationService<T, E> {
.into_iter() .into_iter()
.for_each(|(committee_index, validator_duties)| { .for_each(|(committee_index, validator_duties)| {
// Spawn a separate task for each attestation. // Spawn a separate task for each attestation.
self.inner.context.runtime_handle.spawn( self.inner.context.executor.runtime_handle().spawn(
self.clone() self.clone().publish_attestations_and_aggregates(
.publish_attestations_and_aggregates( slot,
slot, committee_index,
committee_index, validator_duties,
validator_duties, aggregate_production_instant,
aggregate_production_instant, ),
)
.unwrap_or_else(|_| ()),
"duties_by_committee_index",
); );
}); });

View File

@@ -135,7 +135,7 @@ impl<T: SlotClock + 'static, E: EthSpec> BlockService<T, E> {
) )
}; };
let runtime_handle = self.inner.context.runtime_handle.clone(); let executor = self.inner.context.executor.clone();
let interval_fut = async move { let interval_fut = async move {
while interval.next().await.is_some() { while interval.next().await.is_some() {
@@ -143,7 +143,7 @@ impl<T: SlotClock + 'static, E: EthSpec> BlockService<T, E> {
} }
}; };
runtime_handle.spawn(interval_fut, "block_service"); executor.spawn(interval_fut, "block_service");
Ok(()) Ok(())
} }
@@ -184,7 +184,7 @@ impl<T: SlotClock + 'static, E: EthSpec> BlockService<T, E> {
let service = self.clone(); let service = self.clone();
let log = log.clone(); let log = log.clone();
// TODO: run this task with a `spawn_without_name` // TODO: run this task with a `spawn_without_name`
self.inner.context.runtime_handle.spawn( self.inner.context.executor.spawn(
service service
.publish_block(slot, validator_pubkey) .publish_block(slot, validator_pubkey)
.map_err(move |e| { .map_err(move |e| {

View File

@@ -456,11 +456,11 @@ impl<T: SlotClock + 'static, E: EthSpec> DutiesService<T, E> {
// Run an immediate update before starting the updater service. // Run an immediate update before starting the updater service.
self.inner self.inner
.context .context
.runtime_handle .executor
.runtime_handle() .runtime_handle()
.spawn(self.clone().do_update()); .spawn(self.clone().do_update());
let runtime_handle = self.inner.context.runtime_handle.clone(); let executor = self.inner.context.executor.clone();
let interval_fut = async move { let interval_fut = async move {
while interval.next().await.is_some() { while interval.next().await.is_some() {
@@ -468,7 +468,7 @@ impl<T: SlotClock + 'static, E: EthSpec> DutiesService<T, E> {
} }
}; };
runtime_handle.spawn(interval_fut, "duties_service"); executor.spawn(interval_fut, "duties_service");
Ok(()) Ok(())
} }

View File

@@ -117,11 +117,11 @@ impl<T: SlotClock + 'static, E: EthSpec> ForkService<T, E> {
// Run an immediate update before starting the updater service. // Run an immediate update before starting the updater service.
self.inner self.inner
.context .context
.runtime_handle .executor
.runtime_handle() .runtime_handle()
.spawn(self.clone().do_update()); .spawn(self.clone().do_update());
let runtime_handle = self.inner.context.runtime_handle.clone(); let executor = self.inner.context.executor.clone();
let interval_fut = async move { let interval_fut = async move {
while interval.next().await.is_some() { while interval.next().await.is_some() {
@@ -129,7 +129,7 @@ impl<T: SlotClock + 'static, E: EthSpec> ForkService<T, E> {
} }
}; };
runtime_handle.spawn(interval_fut, "fork_service"); executor.spawn(interval_fut, "fork_service");
Ok(()) Ok(())
} }

View File

@@ -8,7 +8,7 @@ use types::EthSpec;
/// Spawns a notifier service which periodically logs information about the node. /// Spawns a notifier service which periodically logs information about the node.
pub fn spawn_notifier<T: EthSpec>(client: &ProductionValidatorClient<T>) -> Result<(), String> { pub fn spawn_notifier<T: EthSpec>(client: &ProductionValidatorClient<T>) -> Result<(), String> {
let context = client.context.service_context("notifier".into()); let context = client.context.service_context("notifier".into());
let runtime_handle = context.runtime_handle.clone(); let executor = context.executor.clone();
let duties_service = client.duties_service.clone(); let duties_service = client.duties_service.clone();
let allow_unsynced_beacon_node = client.config.allow_unsynced_beacon_node; let allow_unsynced_beacon_node = client.config.allow_unsynced_beacon_node;
@@ -81,6 +81,6 @@ pub fn spawn_notifier<T: EthSpec>(client: &ProductionValidatorClient<T>) -> Resu
} }
}; };
runtime_handle.spawn(interval_fut, "validator_notifier"); executor.spawn(interval_fut, "validator_notifier");
Ok(()) Ok(())
} }