Beacon chain tasks use task executor

This commit is contained in:
pawan
2020-05-20 12:15:42 +05:30
parent 1f9e1c4808
commit 4ad39716be
19 changed files with 124 additions and 172 deletions

View File

@@ -46,7 +46,8 @@ tempfile = "3.1.0"
bitvec = "0.17.4"
bls = { path = "../../crypto/bls" }
safe_arith = { path = "../../consensus/safe_arith" }
environment = { path = "../../lighthouse/environment" }
[dev-dependencies]
lazy_static = "1.4.0"
environment = { path = "../../lighthouse/environment" }

View File

@@ -1,4 +1,5 @@
use crate::metrics;
use environment::TaskExecutor;
use eth1::{Config as Eth1Config, Eth1Block, Service as HttpService};
use eth2_hashing::hash;
use slog::{debug, error, trace, Logger};
@@ -285,8 +286,8 @@ impl<T: EthSpec, S: Store<T>> CachingEth1Backend<T, S> {
}
/// Starts the routine which connects to the external eth1 node and updates the caches.
pub fn start(&self, handle: &tokio::runtime::Handle, exit: tokio::sync::oneshot::Receiver<()>) {
HttpService::auto_update(self.core.clone(), handle, exit);
pub fn start(&self, handle: TaskExecutor) {
HttpService::auto_update(self.core.clone(), handle);
}
/// Instantiates `self` from an existing service.

View File

@@ -50,7 +50,6 @@ pub struct ClientBuilder<T: BeaconChainTypes> {
beacon_chain_builder: Option<BeaconChainBuilder<T>>,
beacon_chain: Option<Arc<BeaconChain<T>>>,
eth1_service: Option<Eth1Service>,
exit_channels: Vec<tokio::sync::oneshot::Sender<()>>,
event_handler: Option<T::EventHandler>,
network_globals: Option<Arc<NetworkGlobals<T::EthSpec>>>,
network_send: Option<UnboundedSender<NetworkMessage<T::EthSpec>>>,
@@ -84,7 +83,6 @@ where
beacon_chain_builder: None,
beacon_chain: None,
eth1_service: None,
exit_channels: vec![],
event_handler: None,
network_globals: None,
network_send: None,
@@ -223,13 +221,12 @@ where
.ok_or_else(|| "network requires a runtime_context")?
.clone();
let (network_globals, network_send, network_exit) =
NetworkService::start(beacon_chain, config, &context.runtime_handle, context.log)
let (network_globals, network_send) =
NetworkService::start(beacon_chain, config, context.runtime_handle, context.log)
.map_err(|e| format!("Failed to start network: {:?}", e))?;
self.network_globals = Some(network_globals);
self.network_send = Some(network_send);
self.exit_channels.push(network_exit);
Ok(self)
}
@@ -251,16 +248,14 @@ where
.ok_or_else(|| "node timer requires a chain spec".to_string())?
.milliseconds_per_slot;
let timer_exit = timer::spawn(
&context.runtime_handle,
let _ = timer::spawn(
context.runtime_handle,
beacon_chain,
milliseconds_per_slot,
context.log.clone(),
)
.map_err(|e| format!("Unable to start node timer: {}", e))?;
self.exit_channels.push(timer_exit);
Ok(self)
}
@@ -294,8 +289,8 @@ where
};
let log = context.log.clone();
let (exit_channel, listening_addr) = rest_api::start_server(
&context.runtime_handle,
let listening_addr = rest_api::start_server(
context.runtime_handle,
&client_config.rest_api,
beacon_chain,
network_info,
@@ -310,7 +305,6 @@ where
)
.map_err(|e| format!("Failed to start HTTP API: {:?}", e))?;
self.exit_channels.push(exit_channel);
self.http_listen_addr = Some(listening_addr);
Ok(self)
@@ -337,8 +331,8 @@ where
.ok_or_else(|| "slot_notifier requires a chain spec".to_string())?
.milliseconds_per_slot;
let exit_channel = spawn_notifier(
&context.runtime_handle,
let _ = spawn_notifier(
context.runtime_handle,
beacon_chain,
network_globals,
milliseconds_per_slot,
@@ -346,8 +340,6 @@ where
)
.map_err(|e| format!("Unable to start slot notifier: {}", e))?;
self.exit_channels.push(exit_channel);
Ok(self)
}
@@ -364,7 +356,6 @@ where
network_globals: self.network_globals,
http_listen_addr: self.http_listen_addr,
websocket_listen_addr: self.websocket_listen_addr,
_exit_channels: self.exit_channels,
}
}
}
@@ -435,21 +426,14 @@ where
.ok_or_else(|| "websocket_event_handler requires a runtime_context")?
.service_context("ws".into());
let (sender, exit_channel, listening_addr): (
WebSocketSender<TEthSpec>,
Option<_>,
Option<_>,
) = if config.enabled {
let (sender, exit, listening_addr) =
websocket_server::start_server(&context.runtime_handle, &config, &context.log)?;
(sender, Some(exit), Some(listening_addr))
let (sender, listening_addr): (WebSocketSender<TEthSpec>, Option<_>) = if config.enabled {
let (sender, listening_addr) =
websocket_server::start_server(context.runtime_handle, &config, &context.log)?;
(sender, Some(listening_addr))
} else {
(WebSocketSender::dummy(), None, None)
(WebSocketSender::dummy(), None)
};
if let Some(channel) = exit_channel {
self.exit_channels.push(channel);
}
self.event_handler = Some(sender);
self.websocket_listen_addr = listening_addr;
@@ -653,14 +637,8 @@ where
self.eth1_service = None;
let exit = {
let (tx, rx) = tokio::sync::oneshot::channel();
self.exit_channels.push(tx);
rx
};
// Starts the service that connects to an eth1 node and periodically updates caches.
backend.start(&context.runtime_handle, exit);
backend.start(context.runtime_handle);
self.beacon_chain_builder = Some(beacon_chain_builder.eth1_backend(Some(backend)));

View File

@@ -25,8 +25,6 @@ pub struct Client<T: BeaconChainTypes> {
network_globals: Option<Arc<NetworkGlobals<T::EthSpec>>>,
http_listen_addr: Option<SocketAddr>,
websocket_listen_addr: Option<SocketAddr>,
/// Exit channels will complete/error when dropped, causing each service to exit gracefully.
_exit_channels: Vec<tokio::sync::oneshot::Sender<()>>,
}
impl<T: BeaconChainTypes> Client<T> {

View File

@@ -25,12 +25,12 @@ const SPEEDO_OBSERVATIONS: usize = 4;
/// Spawns a notifier service which periodically logs information about the node.
pub fn spawn_notifier<T: BeaconChainTypes>(
handle: &tokio::runtime::Handle,
handle: environment::TaskExecutor,
beacon_chain: Arc<BeaconChain<T>>,
network: Arc<NetworkGlobals<T::EthSpec>>,
milliseconds_per_slot: u64,
log: slog::Logger,
) -> Result<tokio::sync::oneshot::Sender<()>, String> {
) -> Result<(), String> {
let log_1 = log.clone();
let slot_duration = Duration::from_millis(milliseconds_per_slot);
let duration_to_next_slot = beacon_chain
@@ -148,20 +148,10 @@ pub fn spawn_notifier<T: BeaconChainTypes>(
Ok::<(), ()>(())
};
let (exit_signal, exit) = tokio::sync::oneshot::channel();
let exit_future = async move {
let _ = exit.await.ok();
info!(log_1, "Notifier service shutdown");
};
// run the notifier on the current executor
handle.spawn(futures::future::select(
Box::pin(interval_future),
Box::pin(exit_future),
));
handle.spawn(interval_future.unwrap_or_else(|_| ()), "beacon_notifier");
Ok(exit_signal)
Ok(())
}
/// Returns the peer count, returning something helpful if it's `usize::max_value` (effectively a

View File

@@ -6,7 +6,6 @@ edition = "2018"
[dev-dependencies]
eth1_test_rig = { path = "../../testing/eth1_test_rig" }
environment = { path = "../../lighthouse/environment" }
toml = "0.5.6"
web3 = "0.10.0"
sloggers = "1.0.0"
@@ -30,3 +29,4 @@ state_processing = { path = "../../consensus/state_processing" }
libflate = "1.0.0"
lighthouse_metrics = { path = "../../common/lighthouse_metrics"}
lazy_static = "1.4.0"
environment = { path = "../../lighthouse/environment" }

View File

@@ -283,13 +283,8 @@ impl Service {
/// - Err(_) if there is an error.
///
/// Emits logs for debugging and errors.
pub fn auto_update(
service: Self,
handle: &tokio::runtime::Handle,
exit: tokio::sync::oneshot::Receiver<()>,
) {
pub fn auto_update(service: Self, handle: environment::TaskExecutor) {
let update_interval = Duration::from_millis(service.config().auto_update_interval_millis);
let log = service.log.clone();
let mut interval = interval_at(Instant::now(), update_interval);
@@ -301,14 +296,7 @@ impl Service {
}
};
let exit_future = async move {
let _ = exit.await.ok();
info!(log, "Eth1 service shutdown");
};
let future = futures::future::select(Box::pin(update_future), Box::pin(exit_future));
handle.spawn(future);
handle.spawn(update_future, "eth1_service");
}
async fn do_update(service: Self, update_interval: Duration) -> Result<(), ()> {

View File

@@ -84,7 +84,7 @@ pub struct Service<TSpec: EthSpec> {
impl<TSpec: EthSpec> Service<TSpec> {
pub fn new(
handle: &tokio::runtime::Handle,
handle: tokio::runtime::Handle,
config: &NetworkConfig,
enr_fork_id: EnrForkId,
log: &slog::Logger,
@@ -131,7 +131,7 @@ impl<TSpec: EthSpec> Service<TSpec> {
}
SwarmBuilder::new(transport, behaviour, local_peer_id.clone())
.peer_connection_limit(MAX_CONNECTIONS_PER_PEER)
.executor(Box::new(Executor(handle.clone())))
.executor(Box::new(Executor(handle)))
.build()
};

View File

@@ -34,3 +34,4 @@ fnv = "1.0.6"
rlp = "0.4.5"
lazy_static = "1.4.0"
lighthouse_metrics = { path = "../../common/lighthouse_metrics" }
environment = { path = "../../lighthouse/environment" }

View File

@@ -58,7 +58,7 @@ impl<T: BeaconChainTypes> Router<T> {
beacon_chain: Arc<BeaconChain<T>>,
network_globals: Arc<NetworkGlobals<T::EthSpec>>,
network_send: mpsc::UnboundedSender<NetworkMessage<T::EthSpec>>,
runtime_handle: &tokio::runtime::Handle,
runtime_handle: environment::TaskExecutor,
log: slog::Logger,
) -> error::Result<mpsc::UnboundedSender<RouterMessage<T::EthSpec>>> {
let message_handler_log = log.new(o!("service"=> "router"));
@@ -68,7 +68,8 @@ impl<T: BeaconChainTypes> Router<T> {
// Initialise a message instance, which itself spawns the syncing thread.
let processor = Processor::new(
runtime_handle,
// TODO: spawn_blocking here
&runtime_handle.runtime_handle(),
beacon_chain,
network_globals.clone(),
network_send.clone(),
@@ -84,12 +85,15 @@ impl<T: BeaconChainTypes> Router<T> {
};
// spawn handler task and move the message handler instance into the spawned thread
runtime_handle.spawn(async move {
handler_recv
.for_each(move |msg| future::ready(handler.handle_message(msg)))
.await;
debug!(log, "Network message handler terminated.");
});
runtime_handle.spawn(
async move {
handler_recv
.for_each(move |msg| future::ready(handler.handle_message(msg)))
.await;
debug!(log, "Network message handler terminated.");
},
"router_service",
);
Ok(handler_send)
}

View File

@@ -53,12 +53,11 @@ impl<T: BeaconChainTypes> NetworkService<T> {
pub fn start(
beacon_chain: Arc<BeaconChain<T>>,
config: &NetworkConfig,
runtime_handle: &Handle,
handle: environment::TaskExecutor,
network_log: slog::Logger,
) -> error::Result<(
Arc<NetworkGlobals<T::EthSpec>>,
mpsc::UnboundedSender<NetworkMessage<T::EthSpec>>,
oneshot::Sender<()>,
)> {
// build the network channel
let (network_send, network_recv) = mpsc::unbounded_channel::<NetworkMessage<T::EthSpec>>();
@@ -75,7 +74,7 @@ impl<T: BeaconChainTypes> NetworkService<T> {
// launch libp2p service
let (network_globals, mut libp2p) =
LibP2PService::new(runtime_handle, config, enr_fork_id, &network_log)?;
LibP2PService::new(handle.runtime_handle(), config, enr_fork_id, &network_log)?;
for enr in load_dht::<T::Store, T::EthSpec>(store.clone()) {
libp2p.swarm.add_enr(enr);
@@ -88,7 +87,7 @@ impl<T: BeaconChainTypes> NetworkService<T> {
beacon_chain.clone(),
network_globals.clone(),
network_send.clone(),
runtime_handle,
handle.clone(),
network_log.clone(),
)?;
@@ -111,17 +110,18 @@ impl<T: BeaconChainTypes> NetworkService<T> {
propagation_percentage,
};
let network_exit = spawn_service(runtime_handle, network_service)?;
let _ = spawn_service(handle, network_service)?;
Ok((network_globals, network_send, network_exit))
Ok((network_globals, network_send))
}
}
fn spawn_service<T: BeaconChainTypes>(
handle: &tokio::runtime::Handle,
handle: environment::TaskExecutor,
mut service: NetworkService<T>,
) -> error::Result<tokio::sync::oneshot::Sender<()>> {
let (network_exit, mut exit_rx) = tokio::sync::oneshot::channel();
) -> error::Result<()> {
let mut exit_rx = handle.exit();
let handle = handle.runtime_handle();
// spawn on the current executor
handle.spawn(async move {
@@ -364,7 +364,7 @@ fn spawn_service<T: BeaconChainTypes>(
}
});
Ok(network_exit)
Ok(())
}
/// Returns a `Delay` that triggers shortly after the next change in the beacon chain fork version.

View File

@@ -36,6 +36,7 @@ parking_lot = "0.10.2"
futures = "0.3.5"
operation_pool = { path = "../operation_pool" }
rayon = "1.3.0"
environment = { path = "../../lighthouse/environment" }
[dev-dependencies]
remote_beacon_node = { path = "../../common/remote_beacon_node" }

View File

@@ -52,7 +52,7 @@ pub struct NetworkInfo<T: BeaconChainTypes> {
// Allowing more than 7 arguments.
#[allow(clippy::too_many_arguments)]
pub fn start_server<T: BeaconChainTypes>(
handle: &Handle,
handle: environment::TaskExecutor,
config: &Config,
beacon_chain: Arc<BeaconChain<T>>,
network_info: NetworkInfo<T>,
@@ -60,7 +60,7 @@ pub fn start_server<T: BeaconChainTypes>(
freezer_db_path: PathBuf,
eth2_config: Eth2Config,
log: slog::Logger,
) -> Result<(oneshot::Sender<()>, SocketAddr), hyper::Error> {
) -> Result<SocketAddr, hyper::Error> {
let inner_log = log.clone();
let eth2_config = Arc::new(eth2_config);
@@ -100,7 +100,7 @@ pub fn start_server<T: BeaconChainTypes>(
let actual_listen_addr = server.local_addr();
// Build a channel to kill the HTTP server.
let (exit_signal, exit) = oneshot::channel::<()>();
let exit = handle.exit();
let inner_log = log.clone();
let server_exit = async move {
let _ = exit.await;
@@ -127,9 +127,9 @@ pub fn start_server<T: BeaconChainTypes>(
"port" => actual_listen_addr.port(),
);
handle.spawn(server_future);
handle.runtime_handle().spawn(server_future);
Ok((exit_signal, actual_listen_addr))
Ok(actual_listen_addr)
}
#[derive(Clone)]

View File

@@ -12,3 +12,4 @@ tokio = { version = "0.2.21", features = ["full"] }
slog = "2.5.2"
parking_lot = "0.10.2"
futures = "0.3.5"
environment = { path = "../../lighthouse/environment" }

View File

@@ -13,7 +13,7 @@ use tokio::time::{interval_at, Instant};
/// Spawns a timer service which periodically executes tasks for the beacon chain
pub fn spawn<T: BeaconChainTypes>(
handle: &Handle,
handle: environment::TaskExecutor,
beacon_chain: Arc<BeaconChain<T>>,
milliseconds_per_slot: u64,
log: slog::Logger,
@@ -34,14 +34,7 @@ pub fn spawn<T: BeaconChainTypes>(
}
};
let log_1 = log.clone();
let exit_future = async move {
let _ = exit.await.ok();
info!(log_1, "Timer service shutdown");
};
let future = futures::future::select(Box::pin(timer_future), Box::pin(exit_future));
handle.spawn(future);
handle.spawn(timer_future, "timer_service");
info!(log, "Timer service started");
Ok(exit_signal)

View File

@@ -15,3 +15,4 @@ slog = "2.5.2"
tokio = { version = "0.2.21", features = ["full"] }
types = { path = "../../consensus/types" }
ws = "0.9.1"
environment = { path = "../../lighthouse/environment" }

View File

@@ -34,17 +34,10 @@ impl<T: EthSpec> WebSocketSender<T> {
}
pub fn start_server<T: EthSpec>(
handle: &tokio::runtime::Handle,
handle: environment::TaskExecutor,
config: &Config,
log: &Logger,
) -> Result<
(
WebSocketSender<T>,
tokio::sync::oneshot::Sender<()>,
SocketAddr,
),
String,
> {
) -> Result<(WebSocketSender<T>, SocketAddr), String> {
let server_string = format!("{}:{}", config.listen_address, config.port);
// Create a server that simply ignores any incoming messages.
@@ -68,31 +61,26 @@ pub fn start_server<T: EthSpec>(
let broadcaster = server.broadcaster();
// Produce a signal/channel that can gracefully shutdown the websocket server.
let exit_channel = {
let (exit_channel, exit) = tokio::sync::oneshot::channel();
let log_inner = log.clone();
let broadcaster_inner = server.broadcaster();
let exit_future = async move {
let _ = exit.await;
if let Err(e) = broadcaster_inner.shutdown() {
warn!(
log_inner,
"Websocket server errored on shutdown";
"error" => format!("{:?}", e)
);
} else {
info!(log_inner, "Websocket server shutdown");
}
};
// Place a future on the handle that will shutdown the websocket server when the
// application exits.
handle.spawn(exit_future);
exit_channel
let exit = handle.exit();
let log_inner = log.clone();
let broadcaster_inner = server.broadcaster();
let exit_future = async move {
let _ = exit.await;
if let Err(e) = broadcaster_inner.shutdown() {
warn!(
log_inner,
"Websocket server errored on shutdown";
"error" => format!("{:?}", e)
);
} else {
info!(log_inner, "Websocket server shutdown");
}
};
// Place a future on the handle that will shutdown the websocket server when the
// application exits.
handle.runtime_handle().spawn(exit_future);
let log_inner = log.clone();
let _ = std::thread::spawn(move || match server.run() {
@@ -123,7 +111,6 @@ pub fn start_server<T: EthSpec>(
sender: Some(broadcaster),
_phantom: PhantomData,
},
exit_channel,
actual_listen_addr,
))
}