mirror of
https://github.com/sigp/lighthouse.git
synced 2026-03-10 04:01:51 +00:00
Improve tokio task execution (#1181)
* Add logging on shutdown
* Replace tokio::spawn with handle.spawn
* Upgrade tokio
* Add a task executor
* Beacon chain tasks use task executor
* Validator client tasks use task executor
* Rename runtime_handle to executor
* Add duration histograms; minor fixes
* Cleanup
* Fix logs
* Fix tests
* Remove random file
* Get enr dependency instead of libp2p
* Address some review comments
* Libp2p takes a TaskExecutor
* Ugly fix libp2p tests
* Move TaskExecutor to own file
* Upgrade Dockerfile rust version
* Minor fixes
* Revert "Ugly fix libp2p tests"
This reverts commit 58d4bb690f.
* Pretty fix libp2p tests
* Add spawn_without_exit; change Counter to Gauge
* Tidy
* Move log from RuntimeContext to TaskExecutor
* Fix errors
* Replace histogram with int_gauge for async tasks
* Fix todo
* Fix memory leak in test by exiting all spawned tasks at the end
This commit is contained in:
@@ -3,8 +3,7 @@ use crate::{
|
||||
validator_store::ValidatorStore,
|
||||
};
|
||||
use environment::RuntimeContext;
|
||||
use exit_future::Signal;
|
||||
use futures::{FutureExt, StreamExt};
|
||||
use futures::StreamExt;
|
||||
use remote_beacon_node::{PublishStatus, RemoteBeaconNode};
|
||||
use slog::{crit, debug, info, trace};
|
||||
use slot_clock::SlotClock;
|
||||
@@ -118,8 +117,8 @@ impl<T, E: EthSpec> Deref for AttestationService<T, E> {
|
||||
|
||||
impl<T: SlotClock + 'static, E: EthSpec> AttestationService<T, E> {
|
||||
/// Starts the service which periodically produces attestations.
|
||||
pub fn start_update_service(self, spec: &ChainSpec) -> Result<Signal, String> {
|
||||
let log = self.context.log.clone();
|
||||
pub fn start_update_service(self, spec: &ChainSpec) -> Result<(), String> {
|
||||
let log = self.context.log().clone();
|
||||
|
||||
let slot_duration = Duration::from_millis(spec.milliseconds_per_slot);
|
||||
let duration_to_next_slot = self
|
||||
@@ -141,13 +140,11 @@ impl<T: SlotClock + 'static, E: EthSpec> AttestationService<T, E> {
|
||||
)
|
||||
};
|
||||
|
||||
let (exit_signal, exit_fut) = exit_future::signal();
|
||||
|
||||
let runtime_handle = self.context.runtime_handle.clone();
|
||||
let executor = self.context.executor.clone();
|
||||
|
||||
let interval_fut = async move {
|
||||
while interval.next().await.is_some() {
|
||||
let log = &self.context.log;
|
||||
let log = self.context.log();
|
||||
|
||||
if let Err(e) = self.spawn_attestation_tasks(slot_duration) {
|
||||
crit!(
|
||||
@@ -164,13 +161,8 @@ impl<T: SlotClock + 'static, E: EthSpec> AttestationService<T, E> {
|
||||
}
|
||||
};
|
||||
|
||||
let future = futures::future::select(
|
||||
Box::pin(interval_fut),
|
||||
exit_fut.map(move |_| info!(log, "Shutdown complete")),
|
||||
);
|
||||
runtime_handle.spawn(future);
|
||||
|
||||
Ok(exit_signal)
|
||||
executor.spawn(interval_fut, "attestation_service");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// For each each required attestation, spawn a new task that downloads, signs and uploads the
|
||||
@@ -214,7 +206,7 @@ impl<T: SlotClock + 'static, E: EthSpec> AttestationService<T, E> {
|
||||
.into_iter()
|
||||
.for_each(|(committee_index, validator_duties)| {
|
||||
// Spawn a separate task for each attestation.
|
||||
self.inner.context.runtime_handle.spawn(
|
||||
self.inner.context.executor.runtime_handle().spawn(
|
||||
self.clone().publish_attestations_and_aggregates(
|
||||
slot,
|
||||
committee_index,
|
||||
@@ -243,7 +235,7 @@ impl<T: SlotClock + 'static, E: EthSpec> AttestationService<T, E> {
|
||||
validator_duties: Vec<DutyAndProof>,
|
||||
aggregate_production_instant: Instant,
|
||||
) -> Result<(), ()> {
|
||||
let log = &self.context.log;
|
||||
let log = self.context.log();
|
||||
|
||||
// There's not need to produce `Attestation` or `SignedAggregateAndProof` if we do not have
|
||||
// any validators for the given `slot` and `committee_index`.
|
||||
@@ -314,7 +306,7 @@ impl<T: SlotClock + 'static, E: EthSpec> AttestationService<T, E> {
|
||||
committee_index: CommitteeIndex,
|
||||
validator_duties: &[DutyAndProof],
|
||||
) -> Result<Option<Attestation<E>>, String> {
|
||||
let log = &self.context.log;
|
||||
let log = self.context.log();
|
||||
|
||||
if validator_duties.is_empty() {
|
||||
return Ok(None);
|
||||
@@ -448,7 +440,7 @@ impl<T: SlotClock + 'static, E: EthSpec> AttestationService<T, E> {
|
||||
attestation: Attestation<E>,
|
||||
validator_duties: &[DutyAndProof],
|
||||
) -> Result<(), String> {
|
||||
let log = &self.context.log;
|
||||
let log = self.context.log();
|
||||
|
||||
let aggregated_attestation = self
|
||||
.beacon_node
|
||||
@@ -548,6 +540,7 @@ impl<T: SlotClock + 'static, E: EthSpec> AttestationService<T, E> {
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use futures::future::FutureExt;
|
||||
use parking_lot::RwLock;
|
||||
|
||||
/// This test is to ensure that a `tokio_timer::Delay` with an instant in the past will still
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
use crate::{duties_service::DutiesService, validator_store::ValidatorStore};
|
||||
use environment::RuntimeContext;
|
||||
use exit_future::Signal;
|
||||
use futures::{FutureExt, StreamExt, TryFutureExt};
|
||||
use futures::{StreamExt, TryFutureExt};
|
||||
use remote_beacon_node::{PublishStatus, RemoteBeaconNode};
|
||||
use slog::{crit, error, info, trace};
|
||||
use slot_clock::SlotClock;
|
||||
@@ -113,8 +112,8 @@ impl<T, E: EthSpec> Deref for BlockService<T, E> {
|
||||
|
||||
impl<T: SlotClock + 'static, E: EthSpec> BlockService<T, E> {
|
||||
/// Starts the service that periodically attempts to produce blocks.
|
||||
pub fn start_update_service(self, spec: &ChainSpec) -> Result<Signal, String> {
|
||||
let log = self.context.log.clone();
|
||||
pub fn start_update_service(self, spec: &ChainSpec) -> Result<(), String> {
|
||||
let log = self.context.log().clone();
|
||||
|
||||
let duration_to_next_slot = self
|
||||
.slot_clock
|
||||
@@ -136,7 +135,7 @@ impl<T: SlotClock + 'static, E: EthSpec> BlockService<T, E> {
|
||||
)
|
||||
};
|
||||
|
||||
let runtime_handle = self.inner.context.runtime_handle.clone();
|
||||
let executor = self.inner.context.executor.clone();
|
||||
|
||||
let interval_fut = async move {
|
||||
while interval.next().await.is_some() {
|
||||
@@ -144,20 +143,14 @@ impl<T: SlotClock + 'static, E: EthSpec> BlockService<T, E> {
|
||||
}
|
||||
};
|
||||
|
||||
let (exit_signal, exit_fut) = exit_future::signal();
|
||||
executor.spawn(interval_fut, "block_service");
|
||||
|
||||
let future = futures::future::select(
|
||||
Box::pin(interval_fut),
|
||||
exit_fut.map(move |_| info!(log, "Shutdown complete")),
|
||||
);
|
||||
runtime_handle.spawn(future);
|
||||
|
||||
Ok(exit_signal)
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Attempt to produce a block for any block producers in the `ValidatorStore`.
|
||||
async fn do_update(&self) -> Result<(), ()> {
|
||||
let log = &self.context.log;
|
||||
let log = self.context.log();
|
||||
|
||||
let slot = self.slot_clock.now().ok_or_else(move || {
|
||||
crit!(log, "Duties manager failed to read slot clock");
|
||||
@@ -190,7 +183,7 @@ impl<T: SlotClock + 'static, E: EthSpec> BlockService<T, E> {
|
||||
iter.for_each(|validator_pubkey| {
|
||||
let service = self.clone();
|
||||
let log = log.clone();
|
||||
self.inner.context.runtime_handle.spawn(
|
||||
self.inner.context.executor.runtime_handle().spawn(
|
||||
service
|
||||
.publish_block(slot, validator_pubkey)
|
||||
.map_err(move |e| {
|
||||
@@ -208,7 +201,7 @@ impl<T: SlotClock + 'static, E: EthSpec> BlockService<T, E> {
|
||||
|
||||
/// Produce a block at the given slot for validator_pubkey
|
||||
async fn publish_block(self, slot: Slot, validator_pubkey: PublicKey) -> Result<(), String> {
|
||||
let log = &self.context.log;
|
||||
let log = self.context.log();
|
||||
|
||||
let current_slot = self
|
||||
.slot_clock
|
||||
|
||||
@@ -1,11 +1,10 @@
|
||||
use crate::{is_synced::is_synced, validator_store::ValidatorStore};
|
||||
use environment::RuntimeContext;
|
||||
use exit_future::Signal;
|
||||
use futures::{FutureExt, StreamExt};
|
||||
use futures::StreamExt;
|
||||
use parking_lot::RwLock;
|
||||
use remote_beacon_node::{PublishStatus, RemoteBeaconNode};
|
||||
use rest_types::{ValidatorDuty, ValidatorDutyBytes, ValidatorSubscription};
|
||||
use slog::{debug, error, info, trace, warn};
|
||||
use slog::{debug, error, trace, warn};
|
||||
use slot_clock::SlotClock;
|
||||
use std::collections::HashMap;
|
||||
use std::convert::TryInto;
|
||||
@@ -439,9 +438,7 @@ impl<T: SlotClock + 'static, E: EthSpec> DutiesService<T, E> {
|
||||
}
|
||||
|
||||
/// Start the service that periodically polls the beacon node for validator duties.
|
||||
pub fn start_update_service(self, spec: &ChainSpec) -> Result<Signal, String> {
|
||||
let log = self.context.log.clone();
|
||||
|
||||
pub fn start_update_service(self, spec: &ChainSpec) -> Result<(), String> {
|
||||
let duration_to_next_slot = self
|
||||
.slot_clock
|
||||
.duration_to_next_slot()
|
||||
@@ -456,15 +453,14 @@ impl<T: SlotClock + 'static, E: EthSpec> DutiesService<T, E> {
|
||||
)
|
||||
};
|
||||
|
||||
let (exit_signal, exit_fut) = exit_future::signal();
|
||||
|
||||
// Run an immediate update before starting the updater service.
|
||||
self.inner
|
||||
.context
|
||||
.runtime_handle
|
||||
.executor
|
||||
.runtime_handle()
|
||||
.spawn(self.clone().do_update());
|
||||
|
||||
let runtime_handle = self.inner.context.runtime_handle.clone();
|
||||
let executor = self.inner.context.executor.clone();
|
||||
|
||||
let interval_fut = async move {
|
||||
while interval.next().await.is_some() {
|
||||
@@ -472,18 +468,14 @@ impl<T: SlotClock + 'static, E: EthSpec> DutiesService<T, E> {
|
||||
}
|
||||
};
|
||||
|
||||
let future = futures::future::select(
|
||||
Box::pin(interval_fut),
|
||||
exit_fut.map(move |_| info!(log, "Shutdown complete")),
|
||||
);
|
||||
runtime_handle.spawn(future);
|
||||
executor.spawn(interval_fut, "duties_service");
|
||||
|
||||
Ok(exit_signal)
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Attempt to download the duties of all managed validators for this epoch and the next.
|
||||
async fn do_update(self) -> Result<(), ()> {
|
||||
let log = &self.context.log;
|
||||
let log = self.context.log();
|
||||
|
||||
if !is_synced(&self.beacon_node, &self.slot_clock, None).await
|
||||
&& !self.allow_unsynced_beacon_node
|
||||
@@ -550,7 +542,7 @@ impl<T: SlotClock + 'static, E: EthSpec> DutiesService<T, E> {
|
||||
.await
|
||||
.map_err(move |e| format!("Failed to get duties for epoch {}: {:?}", epoch, e))?;
|
||||
|
||||
let log = self.context.log.clone();
|
||||
let log = self.context.log().clone();
|
||||
|
||||
let mut new_validator = 0;
|
||||
let mut new_epoch = 0;
|
||||
@@ -652,7 +644,7 @@ impl<T: SlotClock + 'static, E: EthSpec> DutiesService<T, E> {
|
||||
)
|
||||
}
|
||||
|
||||
let log = self.context.log.clone();
|
||||
let log = self.context.log().clone();
|
||||
let count = validator_subscriptions.len();
|
||||
|
||||
if count == 0 {
|
||||
|
||||
@@ -1,9 +1,8 @@
|
||||
use environment::RuntimeContext;
|
||||
use exit_future::Signal;
|
||||
use futures::{FutureExt, StreamExt};
|
||||
use futures::StreamExt;
|
||||
use parking_lot::RwLock;
|
||||
use remote_beacon_node::RemoteBeaconNode;
|
||||
use slog::{debug, info, trace};
|
||||
use slog::{debug, trace};
|
||||
use slot_clock::SlotClock;
|
||||
use std::ops::Deref;
|
||||
use std::sync::Arc;
|
||||
@@ -100,9 +99,7 @@ impl<T: SlotClock + 'static, E: EthSpec> ForkService<T, E> {
|
||||
}
|
||||
|
||||
/// Starts the service that periodically polls for the `Fork`.
|
||||
pub fn start_update_service(self, spec: &ChainSpec) -> Result<Signal, String> {
|
||||
let log = self.context.log.clone();
|
||||
|
||||
pub fn start_update_service(self, spec: &ChainSpec) -> Result<(), String> {
|
||||
let duration_to_next_epoch = self
|
||||
.slot_clock
|
||||
.duration_to_next_epoch(E::slots_per_epoch())
|
||||
@@ -117,15 +114,14 @@ impl<T: SlotClock + 'static, E: EthSpec> ForkService<T, E> {
|
||||
)
|
||||
};
|
||||
|
||||
let (exit_signal, exit_fut) = exit_future::signal();
|
||||
|
||||
// Run an immediate update before starting the updater service.
|
||||
self.inner
|
||||
.context
|
||||
.runtime_handle
|
||||
.executor
|
||||
.runtime_handle()
|
||||
.spawn(self.clone().do_update());
|
||||
|
||||
let runtime_handle = self.inner.context.runtime_handle.clone();
|
||||
let executor = self.inner.context.executor.clone();
|
||||
|
||||
let interval_fut = async move {
|
||||
while interval.next().await.is_some() {
|
||||
@@ -133,18 +129,14 @@ impl<T: SlotClock + 'static, E: EthSpec> ForkService<T, E> {
|
||||
}
|
||||
};
|
||||
|
||||
let future = futures::future::select(
|
||||
Box::pin(interval_fut),
|
||||
exit_fut.map(move |_| info!(log, "Shutdown complete")),
|
||||
);
|
||||
runtime_handle.spawn(future);
|
||||
executor.spawn(interval_fut, "fork_service");
|
||||
|
||||
Ok(exit_signal)
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Attempts to download the `Fork` from the server.
|
||||
async fn do_update(self) -> Result<(), ()> {
|
||||
let log = &self.context.log;
|
||||
let log = self.context.log();
|
||||
|
||||
let fork = self
|
||||
.inner
|
||||
|
||||
@@ -17,7 +17,6 @@ use clap::ArgMatches;
|
||||
use config::SLASHING_PROTECTION_FILENAME;
|
||||
use duties_service::{DutiesService, DutiesServiceBuilder};
|
||||
use environment::RuntimeContext;
|
||||
use exit_future::Signal;
|
||||
use fork_service::{ForkService, ForkServiceBuilder};
|
||||
use notifier::spawn_notifier;
|
||||
use remote_beacon_node::RemoteBeaconNode;
|
||||
@@ -41,7 +40,6 @@ pub struct ProductionValidatorClient<T: EthSpec> {
|
||||
fork_service: ForkService<SystemTimeSlotClock, T>,
|
||||
block_service: BlockService<SystemTimeSlotClock, T>,
|
||||
attestation_service: AttestationService<SystemTimeSlotClock, T>,
|
||||
exit_signals: Vec<Signal>,
|
||||
config: Config,
|
||||
}
|
||||
|
||||
@@ -60,10 +58,10 @@ impl<T: EthSpec> ProductionValidatorClient<T> {
|
||||
/// Instantiates the validator client, _without_ starting the timers to trigger block
|
||||
/// and attestation production.
|
||||
pub async fn new(mut context: RuntimeContext<T>, config: Config) -> Result<Self, String> {
|
||||
let log_1 = context.log.clone();
|
||||
let log_2 = context.log.clone();
|
||||
let log_3 = context.log.clone();
|
||||
let log_4 = context.log.clone();
|
||||
let log_1 = context.log().clone();
|
||||
let log_2 = context.log().clone();
|
||||
let log_3 = context.log().clone();
|
||||
let log_4 = context.log().clone();
|
||||
|
||||
info!(
|
||||
log_1,
|
||||
@@ -217,46 +215,32 @@ impl<T: EthSpec> ProductionValidatorClient<T> {
|
||||
fork_service,
|
||||
block_service,
|
||||
attestation_service,
|
||||
exit_signals: vec![],
|
||||
config,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn start_service(&mut self) -> Result<(), String> {
|
||||
let duties_exit = self
|
||||
.duties_service
|
||||
self.duties_service
|
||||
.clone()
|
||||
.start_update_service(&self.context.eth2_config.spec)
|
||||
.map_err(|e| format!("Unable to start duties service: {}", e))?;
|
||||
|
||||
let fork_exit = self
|
||||
.fork_service
|
||||
self.fork_service
|
||||
.clone()
|
||||
.start_update_service(&self.context.eth2_config.spec)
|
||||
.map_err(|e| format!("Unable to start fork service: {}", e))?;
|
||||
|
||||
let block_exit = self
|
||||
.block_service
|
||||
self.block_service
|
||||
.clone()
|
||||
.start_update_service(&self.context.eth2_config.spec)
|
||||
.map_err(|e| format!("Unable to start block service: {}", e))?;
|
||||
|
||||
let attestation_exit = self
|
||||
.attestation_service
|
||||
self.attestation_service
|
||||
.clone()
|
||||
.start_update_service(&self.context.eth2_config.spec)
|
||||
.map_err(|e| format!("Unable to start attestation service: {}", e))?;
|
||||
|
||||
let notifier_exit =
|
||||
spawn_notifier(self).map_err(|e| format!("Failed to start notifier: {}", e))?;
|
||||
|
||||
self.exit_signals = vec![
|
||||
duties_exit,
|
||||
fork_exit,
|
||||
block_exit,
|
||||
attestation_exit,
|
||||
notifier_exit,
|
||||
];
|
||||
spawn_notifier(self).map_err(|e| format!("Failed to start notifier: {}", e))?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -1,16 +1,14 @@
|
||||
use crate::{is_synced::is_synced, ProductionValidatorClient};
|
||||
use exit_future::Signal;
|
||||
use futures::{FutureExt, StreamExt};
|
||||
use futures::StreamExt;
|
||||
use slog::{error, info};
|
||||
use slot_clock::SlotClock;
|
||||
use tokio::time::{interval_at, Duration, Instant};
|
||||
use types::EthSpec;
|
||||
|
||||
/// Spawns a notifier service which periodically logs information about the node.
|
||||
pub fn spawn_notifier<T: EthSpec>(client: &ProductionValidatorClient<T>) -> Result<Signal, String> {
|
||||
pub fn spawn_notifier<T: EthSpec>(client: &ProductionValidatorClient<T>) -> Result<(), String> {
|
||||
let context = client.context.service_context("notifier".into());
|
||||
let runtime_handle = context.runtime_handle.clone();
|
||||
let log = context.log.clone();
|
||||
let executor = context.executor.clone();
|
||||
let duties_service = client.duties_service.clone();
|
||||
let allow_unsynced_beacon_node = client.config.allow_unsynced_beacon_node;
|
||||
|
||||
@@ -25,7 +23,7 @@ pub fn spawn_notifier<T: EthSpec>(client: &ProductionValidatorClient<T>) -> Resu
|
||||
let mut interval = interval_at(start_instant, slot_duration);
|
||||
|
||||
let interval_fut = async move {
|
||||
let log = &context.log;
|
||||
let log = context.log();
|
||||
|
||||
while interval.next().await.is_some() {
|
||||
if !is_synced(
|
||||
@@ -83,12 +81,6 @@ pub fn spawn_notifier<T: EthSpec>(client: &ProductionValidatorClient<T>) -> Resu
|
||||
}
|
||||
};
|
||||
|
||||
let (exit_signal, exit) = exit_future::signal();
|
||||
let future = futures::future::select(
|
||||
Box::pin(interval_fut),
|
||||
exit.map(move |_| info!(log, "Shutdown complete")),
|
||||
);
|
||||
runtime_handle.spawn(future);
|
||||
|
||||
Ok(exit_signal)
|
||||
executor.spawn(interval_fut, "validator_notifier");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user