mirror of
https://github.com/sigp/lighthouse.git
synced 2026-05-08 09:16:00 +00:00
Validator client tasks use task executor
This commit is contained in:
@@ -3,8 +3,7 @@ use crate::{
|
|||||||
validator_store::ValidatorStore,
|
validator_store::ValidatorStore,
|
||||||
};
|
};
|
||||||
use environment::RuntimeContext;
|
use environment::RuntimeContext;
|
||||||
use exit_future::Signal;
|
use futures::{StreamExt, TryFutureExt};
|
||||||
use futures::{FutureExt, StreamExt};
|
|
||||||
use remote_beacon_node::{PublishStatus, RemoteBeaconNode};
|
use remote_beacon_node::{PublishStatus, RemoteBeaconNode};
|
||||||
use slog::{crit, debug, info, trace};
|
use slog::{crit, debug, info, trace};
|
||||||
use slot_clock::SlotClock;
|
use slot_clock::SlotClock;
|
||||||
@@ -118,7 +117,7 @@ impl<T, E: EthSpec> Deref for AttestationService<T, E> {
|
|||||||
|
|
||||||
impl<T: SlotClock + 'static, E: EthSpec> AttestationService<T, E> {
|
impl<T: SlotClock + 'static, E: EthSpec> AttestationService<T, E> {
|
||||||
/// Starts the service which periodically produces attestations.
|
/// Starts the service which periodically produces attestations.
|
||||||
pub fn start_update_service(self, spec: &ChainSpec) -> Result<Signal, String> {
|
pub fn start_update_service(self, spec: &ChainSpec) -> Result<(), String> {
|
||||||
let log = self.context.log.clone();
|
let log = self.context.log.clone();
|
||||||
|
|
||||||
let slot_duration = Duration::from_millis(spec.milliseconds_per_slot);
|
let slot_duration = Duration::from_millis(spec.milliseconds_per_slot);
|
||||||
@@ -141,8 +140,6 @@ impl<T: SlotClock + 'static, E: EthSpec> AttestationService<T, E> {
|
|||||||
)
|
)
|
||||||
};
|
};
|
||||||
|
|
||||||
let (exit_signal, exit_fut) = exit_future::signal();
|
|
||||||
|
|
||||||
let runtime_handle = self.context.runtime_handle.clone();
|
let runtime_handle = self.context.runtime_handle.clone();
|
||||||
|
|
||||||
let interval_fut = async move {
|
let interval_fut = async move {
|
||||||
@@ -164,13 +161,8 @@ impl<T: SlotClock + 'static, E: EthSpec> AttestationService<T, E> {
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
let future = futures::future::select(
|
runtime_handle.spawn(interval_fut, "attestation_service");
|
||||||
Box::pin(interval_fut),
|
Ok(())
|
||||||
exit_fut.map(move |_| info!(log, "Shutdown complete")),
|
|
||||||
);
|
|
||||||
runtime_handle.spawn(future);
|
|
||||||
|
|
||||||
Ok(exit_signal)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// For each each required attestation, spawn a new task that downloads, signs and uploads the
|
/// For each each required attestation, spawn a new task that downloads, signs and uploads the
|
||||||
@@ -215,12 +207,15 @@ impl<T: SlotClock + 'static, E: EthSpec> AttestationService<T, E> {
|
|||||||
.for_each(|(committee_index, validator_duties)| {
|
.for_each(|(committee_index, validator_duties)| {
|
||||||
// Spawn a separate task for each attestation.
|
// Spawn a separate task for each attestation.
|
||||||
self.inner.context.runtime_handle.spawn(
|
self.inner.context.runtime_handle.spawn(
|
||||||
self.clone().publish_attestations_and_aggregates(
|
self.clone()
|
||||||
slot,
|
.publish_attestations_and_aggregates(
|
||||||
committee_index,
|
slot,
|
||||||
validator_duties,
|
committee_index,
|
||||||
aggregate_production_instant,
|
validator_duties,
|
||||||
),
|
aggregate_production_instant,
|
||||||
|
)
|
||||||
|
.unwrap_or_else(|_| ()),
|
||||||
|
"duties_by_committee_index",
|
||||||
);
|
);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|||||||
@@ -1,7 +1,6 @@
|
|||||||
use crate::{duties_service::DutiesService, validator_store::ValidatorStore};
|
use crate::{duties_service::DutiesService, validator_store::ValidatorStore};
|
||||||
use environment::RuntimeContext;
|
use environment::RuntimeContext;
|
||||||
use exit_future::Signal;
|
use futures::{StreamExt, TryFutureExt};
|
||||||
use futures::{FutureExt, StreamExt, TryFutureExt};
|
|
||||||
use remote_beacon_node::{PublishStatus, RemoteBeaconNode};
|
use remote_beacon_node::{PublishStatus, RemoteBeaconNode};
|
||||||
use slog::{crit, error, info, trace};
|
use slog::{crit, error, info, trace};
|
||||||
use slot_clock::SlotClock;
|
use slot_clock::SlotClock;
|
||||||
@@ -113,7 +112,7 @@ impl<T, E: EthSpec> Deref for BlockService<T, E> {
|
|||||||
|
|
||||||
impl<T: SlotClock + 'static, E: EthSpec> BlockService<T, E> {
|
impl<T: SlotClock + 'static, E: EthSpec> BlockService<T, E> {
|
||||||
/// Starts the service that periodically attempts to produce blocks.
|
/// Starts the service that periodically attempts to produce blocks.
|
||||||
pub fn start_update_service(self, spec: &ChainSpec) -> Result<Signal, String> {
|
pub fn start_update_service(self, spec: &ChainSpec) -> Result<(), String> {
|
||||||
let log = self.context.log.clone();
|
let log = self.context.log.clone();
|
||||||
|
|
||||||
let duration_to_next_slot = self
|
let duration_to_next_slot = self
|
||||||
@@ -144,15 +143,9 @@ impl<T: SlotClock + 'static, E: EthSpec> BlockService<T, E> {
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
let (exit_signal, exit_fut) = exit_future::signal();
|
runtime_handle.spawn(interval_fut, "block_service");
|
||||||
|
|
||||||
let future = futures::future::select(
|
Ok(())
|
||||||
Box::pin(interval_fut),
|
|
||||||
exit_fut.map(move |_| info!(log, "Shutdown complete")),
|
|
||||||
);
|
|
||||||
runtime_handle.spawn(future);
|
|
||||||
|
|
||||||
Ok(exit_signal)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Attempt to produce a block for any block producers in the `ValidatorStore`.
|
/// Attempt to produce a block for any block producers in the `ValidatorStore`.
|
||||||
@@ -190,6 +183,7 @@ impl<T: SlotClock + 'static, E: EthSpec> BlockService<T, E> {
|
|||||||
iter.for_each(|validator_pubkey| {
|
iter.for_each(|validator_pubkey| {
|
||||||
let service = self.clone();
|
let service = self.clone();
|
||||||
let log = log.clone();
|
let log = log.clone();
|
||||||
|
// TODO: run this task with a `spawn_without_name`
|
||||||
self.inner.context.runtime_handle.spawn(
|
self.inner.context.runtime_handle.spawn(
|
||||||
service
|
service
|
||||||
.publish_block(slot, validator_pubkey)
|
.publish_block(slot, validator_pubkey)
|
||||||
@@ -199,7 +193,9 @@ impl<T: SlotClock + 'static, E: EthSpec> BlockService<T, E> {
|
|||||||
"Error whilst producing block";
|
"Error whilst producing block";
|
||||||
"message" => e
|
"message" => e
|
||||||
)
|
)
|
||||||
}),
|
})
|
||||||
|
.unwrap_or_else(|_| ()),
|
||||||
|
"publish_block",
|
||||||
);
|
);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|||||||
@@ -1,11 +1,10 @@
|
|||||||
use crate::{is_synced::is_synced, validator_store::ValidatorStore};
|
use crate::{is_synced::is_synced, validator_store::ValidatorStore};
|
||||||
use environment::RuntimeContext;
|
use environment::RuntimeContext;
|
||||||
use exit_future::Signal;
|
use futures::StreamExt;
|
||||||
use futures::{FutureExt, StreamExt};
|
|
||||||
use parking_lot::RwLock;
|
use parking_lot::RwLock;
|
||||||
use remote_beacon_node::{PublishStatus, RemoteBeaconNode};
|
use remote_beacon_node::{PublishStatus, RemoteBeaconNode};
|
||||||
use rest_types::{ValidatorDuty, ValidatorDutyBytes, ValidatorSubscription};
|
use rest_types::{ValidatorDuty, ValidatorDutyBytes, ValidatorSubscription};
|
||||||
use slog::{debug, error, info, trace, warn};
|
use slog::{debug, error, trace, warn};
|
||||||
use slot_clock::SlotClock;
|
use slot_clock::SlotClock;
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::convert::TryInto;
|
use std::convert::TryInto;
|
||||||
@@ -439,9 +438,7 @@ impl<T: SlotClock + 'static, E: EthSpec> DutiesService<T, E> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Start the service that periodically polls the beacon node for validator duties.
|
/// Start the service that periodically polls the beacon node for validator duties.
|
||||||
pub fn start_update_service(self, spec: &ChainSpec) -> Result<Signal, String> {
|
pub fn start_update_service(self, spec: &ChainSpec) -> Result<(), String> {
|
||||||
let log = self.context.log.clone();
|
|
||||||
|
|
||||||
let duration_to_next_slot = self
|
let duration_to_next_slot = self
|
||||||
.slot_clock
|
.slot_clock
|
||||||
.duration_to_next_slot()
|
.duration_to_next_slot()
|
||||||
@@ -456,12 +453,11 @@ impl<T: SlotClock + 'static, E: EthSpec> DutiesService<T, E> {
|
|||||||
)
|
)
|
||||||
};
|
};
|
||||||
|
|
||||||
let (exit_signal, exit_fut) = exit_future::signal();
|
|
||||||
|
|
||||||
// Run an immediate update before starting the updater service.
|
// Run an immediate update before starting the updater service.
|
||||||
self.inner
|
self.inner
|
||||||
.context
|
.context
|
||||||
.runtime_handle
|
.runtime_handle
|
||||||
|
.runtime_handle()
|
||||||
.spawn(self.clone().do_update());
|
.spawn(self.clone().do_update());
|
||||||
|
|
||||||
let runtime_handle = self.inner.context.runtime_handle.clone();
|
let runtime_handle = self.inner.context.runtime_handle.clone();
|
||||||
@@ -472,13 +468,9 @@ impl<T: SlotClock + 'static, E: EthSpec> DutiesService<T, E> {
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
let future = futures::future::select(
|
runtime_handle.spawn(interval_fut, "duties_service");
|
||||||
Box::pin(interval_fut),
|
|
||||||
exit_fut.map(move |_| info!(log, "Shutdown complete")),
|
|
||||||
);
|
|
||||||
runtime_handle.spawn(future);
|
|
||||||
|
|
||||||
Ok(exit_signal)
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Attempt to download the duties of all managed validators for this epoch and the next.
|
/// Attempt to download the duties of all managed validators for this epoch and the next.
|
||||||
|
|||||||
@@ -1,9 +1,8 @@
|
|||||||
use environment::RuntimeContext;
|
use environment::RuntimeContext;
|
||||||
use exit_future::Signal;
|
use futures::StreamExt;
|
||||||
use futures::{FutureExt, StreamExt};
|
|
||||||
use parking_lot::RwLock;
|
use parking_lot::RwLock;
|
||||||
use remote_beacon_node::RemoteBeaconNode;
|
use remote_beacon_node::RemoteBeaconNode;
|
||||||
use slog::{debug, info, trace};
|
use slog::{debug, trace};
|
||||||
use slot_clock::SlotClock;
|
use slot_clock::SlotClock;
|
||||||
use std::ops::Deref;
|
use std::ops::Deref;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
@@ -100,9 +99,7 @@ impl<T: SlotClock + 'static, E: EthSpec> ForkService<T, E> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Starts the service that periodically polls for the `Fork`.
|
/// Starts the service that periodically polls for the `Fork`.
|
||||||
pub fn start_update_service(self, spec: &ChainSpec) -> Result<Signal, String> {
|
pub fn start_update_service(self, spec: &ChainSpec) -> Result<(), String> {
|
||||||
let log = self.context.log.clone();
|
|
||||||
|
|
||||||
let duration_to_next_epoch = self
|
let duration_to_next_epoch = self
|
||||||
.slot_clock
|
.slot_clock
|
||||||
.duration_to_next_epoch(E::slots_per_epoch())
|
.duration_to_next_epoch(E::slots_per_epoch())
|
||||||
@@ -117,12 +114,11 @@ impl<T: SlotClock + 'static, E: EthSpec> ForkService<T, E> {
|
|||||||
)
|
)
|
||||||
};
|
};
|
||||||
|
|
||||||
let (exit_signal, exit_fut) = exit_future::signal();
|
|
||||||
|
|
||||||
// Run an immediate update before starting the updater service.
|
// Run an immediate update before starting the updater service.
|
||||||
self.inner
|
self.inner
|
||||||
.context
|
.context
|
||||||
.runtime_handle
|
.runtime_handle
|
||||||
|
.runtime_handle()
|
||||||
.spawn(self.clone().do_update());
|
.spawn(self.clone().do_update());
|
||||||
|
|
||||||
let runtime_handle = self.inner.context.runtime_handle.clone();
|
let runtime_handle = self.inner.context.runtime_handle.clone();
|
||||||
@@ -133,13 +129,9 @@ impl<T: SlotClock + 'static, E: EthSpec> ForkService<T, E> {
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
let future = futures::future::select(
|
runtime_handle.spawn(interval_fut, "fork_service");
|
||||||
Box::pin(interval_fut),
|
|
||||||
exit_fut.map(move |_| info!(log, "Shutdown complete")),
|
|
||||||
);
|
|
||||||
runtime_handle.spawn(future);
|
|
||||||
|
|
||||||
Ok(exit_signal)
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Attempts to download the `Fork` from the server.
|
/// Attempts to download the `Fork` from the server.
|
||||||
|
|||||||
@@ -16,7 +16,6 @@ use block_service::{BlockService, BlockServiceBuilder};
|
|||||||
use clap::ArgMatches;
|
use clap::ArgMatches;
|
||||||
use duties_service::{DutiesService, DutiesServiceBuilder};
|
use duties_service::{DutiesService, DutiesServiceBuilder};
|
||||||
use environment::RuntimeContext;
|
use environment::RuntimeContext;
|
||||||
use exit_future::Signal;
|
|
||||||
use fork_service::{ForkService, ForkServiceBuilder};
|
use fork_service::{ForkService, ForkServiceBuilder};
|
||||||
use notifier::spawn_notifier;
|
use notifier::spawn_notifier;
|
||||||
use remote_beacon_node::RemoteBeaconNode;
|
use remote_beacon_node::RemoteBeaconNode;
|
||||||
@@ -40,7 +39,6 @@ pub struct ProductionValidatorClient<T: EthSpec> {
|
|||||||
fork_service: ForkService<SystemTimeSlotClock, T>,
|
fork_service: ForkService<SystemTimeSlotClock, T>,
|
||||||
block_service: BlockService<SystemTimeSlotClock, T>,
|
block_service: BlockService<SystemTimeSlotClock, T>,
|
||||||
attestation_service: AttestationService<SystemTimeSlotClock, T>,
|
attestation_service: AttestationService<SystemTimeSlotClock, T>,
|
||||||
exit_signals: Vec<Signal>,
|
|
||||||
config: Config,
|
config: Config,
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -208,46 +206,36 @@ impl<T: EthSpec> ProductionValidatorClient<T> {
|
|||||||
fork_service,
|
fork_service,
|
||||||
block_service,
|
block_service,
|
||||||
attestation_service,
|
attestation_service,
|
||||||
exit_signals: vec![],
|
|
||||||
config,
|
config,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn start_service(&mut self) -> Result<(), String> {
|
pub fn start_service(&mut self) -> Result<(), String> {
|
||||||
let duties_exit = self
|
let _ = self
|
||||||
.duties_service
|
.duties_service
|
||||||
.clone()
|
.clone()
|
||||||
.start_update_service(&self.context.eth2_config.spec)
|
.start_update_service(&self.context.eth2_config.spec)
|
||||||
.map_err(|e| format!("Unable to start duties service: {}", e))?;
|
.map_err(|e| format!("Unable to start duties service: {}", e))?;
|
||||||
|
|
||||||
let fork_exit = self
|
let _ = self
|
||||||
.fork_service
|
.fork_service
|
||||||
.clone()
|
.clone()
|
||||||
.start_update_service(&self.context.eth2_config.spec)
|
.start_update_service(&self.context.eth2_config.spec)
|
||||||
.map_err(|e| format!("Unable to start fork service: {}", e))?;
|
.map_err(|e| format!("Unable to start fork service: {}", e))?;
|
||||||
|
|
||||||
let block_exit = self
|
let _ = self
|
||||||
.block_service
|
.block_service
|
||||||
.clone()
|
.clone()
|
||||||
.start_update_service(&self.context.eth2_config.spec)
|
.start_update_service(&self.context.eth2_config.spec)
|
||||||
.map_err(|e| format!("Unable to start block service: {}", e))?;
|
.map_err(|e| format!("Unable to start block service: {}", e))?;
|
||||||
|
|
||||||
let attestation_exit = self
|
let _ = self
|
||||||
.attestation_service
|
.attestation_service
|
||||||
.clone()
|
.clone()
|
||||||
.start_update_service(&self.context.eth2_config.spec)
|
.start_update_service(&self.context.eth2_config.spec)
|
||||||
.map_err(|e| format!("Unable to start attestation service: {}", e))?;
|
.map_err(|e| format!("Unable to start attestation service: {}", e))?;
|
||||||
|
|
||||||
let notifier_exit =
|
let _ = spawn_notifier(self).map_err(|e| format!("Failed to start notifier: {}", e))?;
|
||||||
spawn_notifier(self).map_err(|e| format!("Failed to start notifier: {}", e))?;
|
|
||||||
|
|
||||||
self.exit_signals = vec![
|
|
||||||
duties_exit,
|
|
||||||
fork_exit,
|
|
||||||
block_exit,
|
|
||||||
attestation_exit,
|
|
||||||
notifier_exit,
|
|
||||||
];
|
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,16 +1,14 @@
|
|||||||
use crate::{is_synced::is_synced, ProductionValidatorClient};
|
use crate::{is_synced::is_synced, ProductionValidatorClient};
|
||||||
use exit_future::Signal;
|
use futures::StreamExt;
|
||||||
use futures::{FutureExt, StreamExt};
|
|
||||||
use slog::{error, info};
|
use slog::{error, info};
|
||||||
use slot_clock::SlotClock;
|
use slot_clock::SlotClock;
|
||||||
use tokio::time::{interval_at, Duration, Instant};
|
use tokio::time::{interval_at, Duration, Instant};
|
||||||
use types::EthSpec;
|
use types::EthSpec;
|
||||||
|
|
||||||
/// Spawns a notifier service which periodically logs information about the node.
|
/// Spawns a notifier service which periodically logs information about the node.
|
||||||
pub fn spawn_notifier<T: EthSpec>(client: &ProductionValidatorClient<T>) -> Result<Signal, String> {
|
pub fn spawn_notifier<T: EthSpec>(client: &ProductionValidatorClient<T>) -> Result<(), String> {
|
||||||
let context = client.context.service_context("notifier".into());
|
let context = client.context.service_context("notifier".into());
|
||||||
let runtime_handle = context.runtime_handle.clone();
|
let runtime_handle = context.runtime_handle.clone();
|
||||||
let log = context.log.clone();
|
|
||||||
let duties_service = client.duties_service.clone();
|
let duties_service = client.duties_service.clone();
|
||||||
let allow_unsynced_beacon_node = client.config.allow_unsynced_beacon_node;
|
let allow_unsynced_beacon_node = client.config.allow_unsynced_beacon_node;
|
||||||
|
|
||||||
@@ -83,12 +81,6 @@ pub fn spawn_notifier<T: EthSpec>(client: &ProductionValidatorClient<T>) -> Resu
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
let (exit_signal, exit) = exit_future::signal();
|
runtime_handle.spawn(interval_fut, "validator_notifier");
|
||||||
let future = futures::future::select(
|
Ok(())
|
||||||
Box::pin(interval_fut),
|
|
||||||
exit.map(move |_| info!(log, "Shutdown complete")),
|
|
||||||
);
|
|
||||||
runtime_handle.spawn(future);
|
|
||||||
|
|
||||||
Ok(exit_signal)
|
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user