Revert "Merge pull request #200 from sigp/new-structure"

This reverts commit d7a3545be1, reversing
changes made to 1da06c156c.
This commit is contained in:
Paul Hauner
2019-02-14 12:09:18 +11:00
parent d7a3545be1
commit 35c914baa6
163 changed files with 15510 additions and 137 deletions

View File

@@ -0,0 +1,20 @@
[package]
name = "validator_client"
version = "0.1.0"
authors = ["Paul Hauner <paul@paulhauner.com>"]
edition = "2018"
[dependencies]
block_producer = { path = "../eth2/block_producer" }
bls = { path = "../eth2/utils/bls" }
clap = "2.32.0"
dirs = "1.0.3"
grpcio = { version = "0.4", default-features = false, features = ["protobuf-codec"] }
protobuf = "2.0.2"
protos = { path = "../protos" }
slot_clock = { path = "../eth2/utils/slot_clock" }
types = { path = "../eth2/types" }
slog = "^2.2.3"
slog-term = "^2.4.0"
slog-async = "^2.3.0"
ssz = { path = "../eth2/utils/ssz" }

View File

@@ -0,0 +1,67 @@
# Lighthouse Validator Client
The Validator Client (VC) is a stand-alone binary which connects to a Beacon
Node (BN) and fulfils the roles of a validator.
## Roles
The VC is responsible for the following tasks:
- Requesting validator duties (a.k.a. shuffling) from the BN.
- Prompting the BN to produce a new block, when a validators block production
duties require.
- Completing all the fields on a new block (e.g., RANDAO reveal, signature) and
publishing the block to a BN.
- Prompting the BN to produce a new shard atteststation as per a validators
duties.
- Ensuring that no slashable messages are signed by a validator private key.
- Keeping track of the system clock and how it relates to slots/epochs.
The VC is capable of managing multiple validators in the same process tree.
## Implementation
_This section describes the present implementation of this VC binary._
### Services
Each validator is represented by two services, one which tracks the validator
duties and another which performs block production duties.
A separate thread is maintained for each service, for each validator. As such,
a single validator utilises three (3) threads (one for the base VC and two for
each service) and two validators utilise five (5) threads.
#### `DutiesManagerService`
Polls a BN and requests validator responsibilities, as well as a validator
index. The outcome of a successful poll is a `EpochDuties` struct:
```rust
EpochDuties {
validator_index: u64,
block_prodcution_slot: u64,
}
```
This is stored in the `EpochDutiesMap`, a `HashMap` mapping `epoch ->
EpochDuties`.
#### `BlockProducerService`
Polls the system clock and determines if a block needs to be produced. Reads
from the `EpochDutiesMap` maintained by the `DutiesManagerService`.
If block production is required, performs all the necessary duties to request,
complete and return a block from the BN.
### Configuration
Presently the validator specifics (pubkey, etc.) are randomly generated and the
chain specification (slot length, BLS domain, etc.) are fixed to foundation
parameters. This is temporary and will be upgrade so these parameters can be
read from file (or initialized on first-boot).
## BN Communication
The VC communicates with the BN via a gRPC/protobuf connection.

View File

@@ -0,0 +1,102 @@
use block_producer::{BeaconNode, BeaconNodeError, PublishOutcome};
use protos::services::{
BeaconBlock as GrpcBeaconBlock, ProduceBeaconBlockRequest, PublishBeaconBlockRequest,
};
use protos::services_grpc::BeaconBlockServiceClient;
use ssz::{ssz_encode, Decodable};
use std::sync::Arc;
use types::{BeaconBlock, BeaconBlockBody, Eth1Data, Hash256, Signature, Slot};
/// A newtype designed to wrap the gRPC-generated service so the `BeaconNode` trait may be
/// implemented upon it.
pub struct BeaconBlockGrpcClient {
client: Arc<BeaconBlockServiceClient>,
}
impl BeaconBlockGrpcClient {
pub fn new(client: Arc<BeaconBlockServiceClient>) -> Self {
Self { client }
}
}
impl BeaconNode for BeaconBlockGrpcClient {
/// Request a Beacon Node (BN) to produce a new block at the supplied slot.
///
/// Returns `None` if it is not possible to produce at the supplied slot. For example, if the
/// BN is unable to find a parent block.
fn produce_beacon_block(
&self,
slot: Slot,
// TODO: use randao_reveal, when proto APIs have been updated.
_randao_reveal: &Signature,
) -> Result<Option<BeaconBlock>, BeaconNodeError> {
let mut req = ProduceBeaconBlockRequest::new();
req.set_slot(slot.as_u64());
let reply = self
.client
.produce_beacon_block(&req)
.map_err(|err| BeaconNodeError::RemoteFailure(format!("{:?}", err)))?;
if reply.has_block() {
let block = reply.get_block();
let (signature, _) = Signature::ssz_decode(block.get_signature(), 0)
.map_err(|_| BeaconNodeError::DecodeFailure)?;
let (randao_reveal, _) = Signature::ssz_decode(block.get_randao_reveal(), 0)
.map_err(|_| BeaconNodeError::DecodeFailure)?;
// TODO: this conversion is incomplete; fix it.
Ok(Some(BeaconBlock {
slot: Slot::new(block.get_slot()),
parent_root: Hash256::zero(),
state_root: Hash256::zero(),
randao_reveal,
eth1_data: Eth1Data {
deposit_root: Hash256::zero(),
block_hash: Hash256::zero(),
},
signature,
body: BeaconBlockBody {
proposer_slashings: vec![],
attester_slashings: vec![],
attestations: vec![],
deposits: vec![],
exits: vec![],
},
}))
} else {
Ok(None)
}
}
/// Request a Beacon Node (BN) to publish a block.
///
/// Generally, this will be called after a `produce_beacon_block` call with a block that has
/// been completed (signed) by the validator client.
fn publish_beacon_block(&self, block: BeaconBlock) -> Result<PublishOutcome, BeaconNodeError> {
let mut req = PublishBeaconBlockRequest::new();
// TODO: this conversion is incomplete; fix it.
let mut grpc_block = GrpcBeaconBlock::new();
grpc_block.set_slot(block.slot.as_u64());
grpc_block.set_block_root(vec![0]);
grpc_block.set_randao_reveal(ssz_encode(&block.randao_reveal));
grpc_block.set_signature(ssz_encode(&block.signature));
req.set_block(grpc_block);
let reply = self
.client
.publish_beacon_block(&req)
.map_err(|err| BeaconNodeError::RemoteFailure(format!("{:?}", err)))?;
if reply.get_success() {
Ok(PublishOutcome::ValidBlock)
} else {
// TODO: distinguish between different errors
Ok(PublishOutcome::InvalidBlock("Publish failed".to_string()))
}
}
}

View File

@@ -0,0 +1,58 @@
mod beacon_block_grpc_client;
// mod block_producer_service;
use block_producer::{
BeaconNode, BlockProducer, DutiesReader, PollOutcome as BlockProducerPollOutcome, Signer,
};
use slog::{error, info, warn, Logger};
use slot_clock::SlotClock;
use std::time::Duration;
pub use self::beacon_block_grpc_client::BeaconBlockGrpcClient;
pub struct BlockProducerService<T: SlotClock, U: BeaconNode, V: DutiesReader, W: Signer> {
pub block_producer: BlockProducer<T, U, V, W>,
pub poll_interval_millis: u64,
pub log: Logger,
}
impl<T: SlotClock, U: BeaconNode, V: DutiesReader, W: Signer> BlockProducerService<T, U, V, W> {
/// Run a loop which polls the block producer each `poll_interval_millis` millseconds.
///
/// Logs the results of the polls.
pub fn run(&mut self) {
loop {
match self.block_producer.poll() {
Err(error) => {
error!(self.log, "Block producer poll error"; "error" => format!("{:?}", error))
}
Ok(BlockProducerPollOutcome::BlockProduced(slot)) => {
info!(self.log, "Produced block"; "slot" => slot)
}
Ok(BlockProducerPollOutcome::SlashableBlockNotProduced(slot)) => {
warn!(self.log, "Slashable block was not signed"; "slot" => slot)
}
Ok(BlockProducerPollOutcome::BlockProductionNotRequired(slot)) => {
info!(self.log, "Block production not required"; "slot" => slot)
}
Ok(BlockProducerPollOutcome::ProducerDutiesUnknown(slot)) => {
error!(self.log, "Block production duties unknown"; "slot" => slot)
}
Ok(BlockProducerPollOutcome::SlotAlreadyProcessed(slot)) => {
warn!(self.log, "Attempted to re-process slot"; "slot" => slot)
}
Ok(BlockProducerPollOutcome::BeaconNodeUnableToProduceBlock(slot)) => {
error!(self.log, "Beacon node unable to produce block"; "slot" => slot)
}
Ok(BlockProducerPollOutcome::SignerRejection(slot)) => {
error!(self.log, "The cryptographic signer refused to sign the block"; "slot" => slot)
}
Ok(BlockProducerPollOutcome::ValidatorIsUnknown(slot)) => {
error!(self.log, "The Beacon Node does not recognise the validator"; "slot" => slot)
}
};
std::thread::sleep(Duration::from_millis(self.poll_interval_millis));
}
}
}

View File

@@ -0,0 +1,25 @@
use std::fs;
use std::path::PathBuf;
/// Stores the core configuration for this validator instance.
#[derive(Clone)]
pub struct ClientConfig {
pub data_dir: PathBuf,
pub server: String,
}
const DEFAULT_LIGHTHOUSE_DIR: &str = ".lighthouse-validators";
impl ClientConfig {
/// Build a new configuration from defaults.
pub fn default() -> Self {
let data_dir = {
let home = dirs::home_dir().expect("Unable to determine home dir.");
home.join(DEFAULT_LIGHTHOUSE_DIR)
};
fs::create_dir_all(&data_dir)
.unwrap_or_else(|_| panic!("Unable to create {:?}", &data_dir));
let server = "localhost:50051".to_string();
Self { data_dir, server }
}
}

View File

@@ -0,0 +1,80 @@
use block_producer::{DutiesReader, DutiesReaderError};
use std::collections::HashMap;
use std::sync::RwLock;
use types::{Epoch, Slot};
/// The information required for a validator to propose and attest during some epoch.
///
/// Generally obtained from a Beacon Node, this information contains the validators canonical index
/// (thier sequence in the global validator induction process) and the "shuffling" for that index
/// for some epoch.
#[derive(Debug, PartialEq, Clone, Copy, Default)]
pub struct EpochDuties {
pub validator_index: u64,
pub block_production_slot: Option<Slot>,
// Future shard info
}
impl EpochDuties {
/// Returns `true` if the supplied `slot` is a slot in which the validator should produce a
/// block.
pub fn is_block_production_slot(&self, slot: Slot) -> bool {
match self.block_production_slot {
Some(s) if s == slot => true,
_ => false,
}
}
}
pub enum EpochDutiesMapError {
Poisoned,
}
/// Maps an `epoch` to some `EpochDuties` for a single validator.
pub struct EpochDutiesMap {
pub epoch_length: u64,
pub map: RwLock<HashMap<Epoch, EpochDuties>>,
}
impl EpochDutiesMap {
pub fn new(epoch_length: u64) -> Self {
Self {
epoch_length,
map: RwLock::new(HashMap::new()),
}
}
pub fn get(&self, epoch: Epoch) -> Result<Option<EpochDuties>, EpochDutiesMapError> {
let map = self.map.read().map_err(|_| EpochDutiesMapError::Poisoned)?;
match map.get(&epoch) {
Some(duties) => Ok(Some(*duties)),
None => Ok(None),
}
}
pub fn insert(
&self,
epoch: Epoch,
epoch_duties: EpochDuties,
) -> Result<Option<EpochDuties>, EpochDutiesMapError> {
let mut map = self
.map
.write()
.map_err(|_| EpochDutiesMapError::Poisoned)?;
Ok(map.insert(epoch, epoch_duties))
}
}
impl DutiesReader for EpochDutiesMap {
fn is_block_production_slot(&self, slot: Slot) -> Result<bool, DutiesReaderError> {
let epoch = slot.epoch(self.epoch_length);
let map = self.map.read().map_err(|_| DutiesReaderError::Poisoned)?;
let duties = map
.get(&epoch)
.ok_or_else(|| DutiesReaderError::UnknownEpoch)?;
Ok(duties.is_block_production_slot(slot))
}
}
// TODO: add tests.

View File

@@ -0,0 +1,54 @@
use super::traits::{BeaconNode, BeaconNodeError};
use super::EpochDuties;
use protos::services::{ProposeBlockSlotRequest, PublicKey as IndexRequest};
use protos::services_grpc::ValidatorServiceClient;
use ssz::ssz_encode;
use types::{Epoch, PublicKey, Slot};
impl BeaconNode for ValidatorServiceClient {
/// Request the shuffling from the Beacon Node (BN).
///
/// As this function takes a `PublicKey`, it will first attempt to resolve the public key into
/// a validator index, then call the BN for production/attestation duties.
///
/// Note: presently only block production information is returned.
fn request_shuffling(
&self,
epoch: Epoch,
public_key: &PublicKey,
) -> Result<Option<EpochDuties>, BeaconNodeError> {
// Lookup the validator index for the supplied public key.
let validator_index = {
let mut req = IndexRequest::new();
req.set_public_key(ssz_encode(public_key).to_vec());
let resp = self
.validator_index(&req)
.map_err(|err| BeaconNodeError::RemoteFailure(format!("{:?}", err)))?;
resp.get_index()
};
let mut req = ProposeBlockSlotRequest::new();
req.set_validator_index(validator_index);
req.set_epoch(epoch.as_u64());
let reply = self
.propose_block_slot(&req)
.map_err(|err| BeaconNodeError::RemoteFailure(format!("{:?}", err)))?;
let block_production_slot = if reply.has_slot() {
Some(reply.get_slot())
} else {
None
};
let block_production_slot = match block_production_slot {
Some(slot) => Some(Slot::new(slot)),
None => None,
};
Ok(Some(EpochDuties {
validator_index,
block_production_slot,
}))
}
}

View File

@@ -0,0 +1,161 @@
mod epoch_duties;
mod grpc;
mod service;
#[cfg(test)]
mod test_node;
mod traits;
pub use self::epoch_duties::EpochDutiesMap;
use self::epoch_duties::{EpochDuties, EpochDutiesMapError};
pub use self::service::DutiesManagerService;
use self::traits::{BeaconNode, BeaconNodeError};
use bls::PublicKey;
use slot_clock::SlotClock;
use std::sync::Arc;
use types::{ChainSpec, Epoch};
#[derive(Debug, PartialEq, Clone, Copy)]
pub enum PollOutcome {
/// The `EpochDuties` were not updated during this poll.
NoChange(Epoch),
/// The `EpochDuties` for the `epoch` were previously unknown, but obtained in the poll.
NewDuties(Epoch, EpochDuties),
/// New `EpochDuties` were obtained, different to those which were previously known. This is
/// likely to be the result of chain re-organisation.
DutiesChanged(Epoch, EpochDuties),
/// The Beacon Node was unable to return the duties as the validator is unknown, or the
/// shuffling for the epoch is unknown.
UnknownValidatorOrEpoch(Epoch),
}
#[derive(Debug, PartialEq)]
pub enum Error {
SlotClockError,
SlotUnknowable,
EpochMapPoisoned,
BeaconNodeError(BeaconNodeError),
}
/// A polling state machine which ensures the latest `EpochDuties` are obtained from the Beacon
/// Node.
///
/// There is a single `DutiesManager` per validator instance.
pub struct DutiesManager<T: SlotClock, U: BeaconNode> {
pub duties_map: Arc<EpochDutiesMap>,
/// The validator's public key.
pub pubkey: PublicKey,
pub spec: Arc<ChainSpec>,
pub slot_clock: Arc<T>,
pub beacon_node: Arc<U>,
}
impl<T: SlotClock, U: BeaconNode> DutiesManager<T, U> {
/// Poll the Beacon Node for `EpochDuties`.
///
/// The present `epoch` will be learned from the supplied `SlotClock`. In production this will
/// be a wall-clock (e.g., system time, remote server time, etc.).
pub fn poll(&self) -> Result<PollOutcome, Error> {
let slot = self
.slot_clock
.present_slot()
.map_err(|_| Error::SlotClockError)?
.ok_or(Error::SlotUnknowable)?;
let epoch = slot.epoch(self.spec.epoch_length);
if let Some(duties) = self.beacon_node.request_shuffling(epoch, &self.pubkey)? {
// If these duties were known, check to see if they're updates or identical.
let result = if let Some(known_duties) = self.duties_map.get(epoch)? {
if known_duties == duties {
Ok(PollOutcome::NoChange(epoch))
} else {
Ok(PollOutcome::DutiesChanged(epoch, duties))
}
} else {
Ok(PollOutcome::NewDuties(epoch, duties))
};
self.duties_map.insert(epoch, duties)?;
result
} else {
Ok(PollOutcome::UnknownValidatorOrEpoch(epoch))
}
}
}
impl From<BeaconNodeError> for Error {
fn from(e: BeaconNodeError) -> Error {
Error::BeaconNodeError(e)
}
}
impl From<EpochDutiesMapError> for Error {
fn from(e: EpochDutiesMapError) -> Error {
match e {
EpochDutiesMapError::Poisoned => Error::EpochMapPoisoned,
}
}
}
#[cfg(test)]
mod tests {
use super::test_node::TestBeaconNode;
use super::*;
use bls::Keypair;
use slot_clock::TestingSlotClock;
use types::Slot;
// TODO: implement more thorough testing.
// https://github.com/sigp/lighthouse/issues/160
//
// These tests should serve as a good example for future tests.
#[test]
pub fn polling() {
let spec = Arc::new(ChainSpec::foundation());
let duties_map = Arc::new(EpochDutiesMap::new(spec.epoch_length));
let keypair = Keypair::random();
let slot_clock = Arc::new(TestingSlotClock::new(0));
let beacon_node = Arc::new(TestBeaconNode::default());
let manager = DutiesManager {
spec: spec.clone(),
pubkey: keypair.pk.clone(),
duties_map: duties_map.clone(),
slot_clock: slot_clock.clone(),
beacon_node: beacon_node.clone(),
};
// Configure response from the BeaconNode.
let duties = EpochDuties {
validator_index: 0,
block_production_slot: Some(Slot::new(10)),
};
beacon_node.set_next_shuffling_result(Ok(Some(duties)));
// Get the duties for the first time...
assert_eq!(
manager.poll(),
Ok(PollOutcome::NewDuties(Epoch::new(0), duties))
);
// Get the same duties again...
assert_eq!(manager.poll(), Ok(PollOutcome::NoChange(Epoch::new(0))));
// Return new duties.
let duties = EpochDuties {
validator_index: 0,
block_production_slot: Some(Slot::new(11)),
};
beacon_node.set_next_shuffling_result(Ok(Some(duties)));
assert_eq!(
manager.poll(),
Ok(PollOutcome::DutiesChanged(Epoch::new(0), duties))
);
// Return no duties.
beacon_node.set_next_shuffling_result(Ok(None));
assert_eq!(
manager.poll(),
Ok(PollOutcome::UnknownValidatorOrEpoch(Epoch::new(0)))
);
}
}

View File

@@ -0,0 +1,40 @@
use super::traits::BeaconNode;
use super::{DutiesManager, PollOutcome};
use slog::{debug, error, info, Logger};
use slot_clock::SlotClock;
use std::time::Duration;
pub struct DutiesManagerService<T: SlotClock, U: BeaconNode> {
pub manager: DutiesManager<T, U>,
pub poll_interval_millis: u64,
pub log: Logger,
}
impl<T: SlotClock, U: BeaconNode> DutiesManagerService<T, U> {
/// Run a loop which polls the manager each `poll_interval_millis` milliseconds.
///
/// Logs the results of the polls.
pub fn run(&mut self) {
loop {
match self.manager.poll() {
Err(error) => {
error!(self.log, "Epoch duties poll error"; "error" => format!("{:?}", error))
}
Ok(PollOutcome::NoChange(epoch)) => {
debug!(self.log, "No change in duties"; "epoch" => epoch)
}
Ok(PollOutcome::DutiesChanged(epoch, duties)) => {
info!(self.log, "Duties changed (potential re-org)"; "epoch" => epoch, "duties" => format!("{:?}", duties))
}
Ok(PollOutcome::NewDuties(epoch, duties)) => {
info!(self.log, "New duties obtained"; "epoch" => epoch, "duties" => format!("{:?}", duties))
}
Ok(PollOutcome::UnknownValidatorOrEpoch(epoch)) => {
error!(self.log, "Epoch or validator unknown"; "epoch" => epoch)
}
};
std::thread::sleep(Duration::from_millis(self.poll_interval_millis));
}
}
}

View File

@@ -0,0 +1,32 @@
use super::traits::{BeaconNode, BeaconNodeError};
use super::EpochDuties;
use bls::PublicKey;
use std::sync::RwLock;
use types::Epoch;
type ShufflingResult = Result<Option<EpochDuties>, BeaconNodeError>;
/// A test-only struct used to simulate a Beacon Node.
#[derive(Default)]
pub struct TestBeaconNode {
pub request_shuffling_input: RwLock<Option<(Epoch, PublicKey)>>,
pub request_shuffling_result: RwLock<Option<ShufflingResult>>,
}
impl TestBeaconNode {
/// Set the result to be returned when `request_shuffling` is called.
pub fn set_next_shuffling_result(&self, result: ShufflingResult) {
*self.request_shuffling_result.write().unwrap() = Some(result);
}
}
impl BeaconNode for TestBeaconNode {
/// Returns the value specified by the `set_next_shuffling_result`.
fn request_shuffling(&self, epoch: Epoch, public_key: &PublicKey) -> ShufflingResult {
*self.request_shuffling_input.write().unwrap() = Some((epoch, public_key.clone()));
match *self.request_shuffling_result.read().unwrap() {
Some(ref r) => r.clone(),
None => panic!("TestBeaconNode: produce_result == None"),
}
}
}

View File

@@ -0,0 +1,20 @@
use super::EpochDuties;
use bls::PublicKey;
use types::Epoch;
#[derive(Debug, PartialEq, Clone)]
pub enum BeaconNodeError {
RemoteFailure(String),
}
/// Defines the methods required to obtain a validators shuffling from a Beacon Node.
pub trait BeaconNode: Send + Sync {
/// Get the shuffling for the given epoch and public key.
///
/// Returns Ok(None) if the public key is unknown, or the shuffling for that epoch is unknown.
fn request_shuffling(
&self,
epoch: Epoch,
public_key: &PublicKey,
) -> Result<Option<EpochDuties>, BeaconNodeError>;
}

View File

@@ -0,0 +1,172 @@
use self::block_producer_service::{BeaconBlockGrpcClient, BlockProducerService};
use self::duties::{DutiesManager, DutiesManagerService, EpochDutiesMap};
use crate::config::ClientConfig;
use block_producer::{test_utils::LocalSigner, BlockProducer};
use bls::Keypair;
use clap::{App, Arg};
use grpcio::{ChannelBuilder, EnvBuilder};
use protos::services_grpc::{BeaconBlockServiceClient, ValidatorServiceClient};
use slog::{error, info, o, Drain};
use slot_clock::SystemTimeSlotClock;
use std::path::PathBuf;
use std::sync::Arc;
use std::thread;
use types::ChainSpec;
mod block_producer_service;
mod config;
mod duties;
fn main() {
// Logging
let decorator = slog_term::TermDecorator::new().build();
let drain = slog_term::CompactFormat::new(decorator).build().fuse();
let drain = slog_async::Async::new(drain).build().fuse();
let log = slog::Logger::root(drain, o!());
// CLI
let matches = App::new("Lighthouse Validator Client")
.version("0.0.1")
.author("Sigma Prime <contact@sigmaprime.io>")
.about("Eth 2.0 Validator Client")
.arg(
Arg::with_name("datadir")
.long("datadir")
.value_name("DIR")
.help("Data directory for keys and databases.")
.takes_value(true),
)
.arg(
Arg::with_name("server")
.long("server")
.value_name("server")
.help("Address to connect to BeaconNode.")
.takes_value(true),
)
.get_matches();
let mut config = ClientConfig::default();
// Custom datadir
if let Some(dir) = matches.value_of("datadir") {
config.data_dir = PathBuf::from(dir.to_string());
}
// Custom server port
if let Some(server_str) = matches.value_of("server") {
if let Ok(addr) = server_str.parse::<u16>() {
config.server = addr.to_string();
} else {
error!(log, "Invalid address"; "server" => server_str);
return;
}
}
// Log configuration
info!(log, "";
"data_dir" => &config.data_dir.to_str(),
"server" => &config.server);
// Beacon node gRPC beacon block endpoints.
let beacon_block_grpc_client = {
let env = Arc::new(EnvBuilder::new().build());
let ch = ChannelBuilder::new(env).connect(&config.server);
Arc::new(BeaconBlockServiceClient::new(ch))
};
// Beacon node gRPC validator endpoints.
let validator_grpc_client = {
let env = Arc::new(EnvBuilder::new().build());
let ch = ChannelBuilder::new(env).connect(&config.server);
Arc::new(ValidatorServiceClient::new(ch))
};
// Ethereum
//
// TODO: Permit loading a custom spec from file.
// https://github.com/sigp/lighthouse/issues/160
let spec = Arc::new(ChainSpec::foundation());
// Clock for determining the present slot.
// TODO: this shouldn't be a static time, instead it should be pulled from the beacon node.
// https://github.com/sigp/lighthouse/issues/160
let genesis_time = 1_549_935_547;
let slot_clock = {
info!(log, "Genesis time"; "unix_epoch_seconds" => genesis_time);
let clock = SystemTimeSlotClock::new(genesis_time, spec.slot_duration)
.expect("Unable to instantiate SystemTimeSlotClock.");
Arc::new(clock)
};
let poll_interval_millis = spec.slot_duration * 1000 / 10; // 10% epoch time precision.
info!(log, "Starting block producer service"; "polls_per_epoch" => spec.slot_duration * 1000 / poll_interval_millis);
/*
* Start threads.
*/
let mut threads = vec![];
// TODO: keypairs are randomly generated; they should be loaded from a file or generated.
// https://github.com/sigp/lighthouse/issues/160
let keypairs = vec![Keypair::random()];
for keypair in keypairs {
info!(log, "Starting validator services"; "validator" => keypair.pk.concatenated_hex_id());
let duties_map = Arc::new(EpochDutiesMap::new(spec.epoch_length));
// Spawn a new thread to maintain the validator's `EpochDuties`.
let duties_manager_thread = {
let spec = spec.clone();
let duties_map = duties_map.clone();
let slot_clock = slot_clock.clone();
let log = log.clone();
let beacon_node = validator_grpc_client.clone();
let pubkey = keypair.pk.clone();
thread::spawn(move || {
let manager = DutiesManager {
duties_map,
pubkey,
spec,
slot_clock,
beacon_node,
};
let mut duties_manager_service = DutiesManagerService {
manager,
poll_interval_millis,
log,
};
duties_manager_service.run();
})
};
// Spawn a new thread to perform block production for the validator.
let producer_thread = {
let spec = spec.clone();
let signer = Arc::new(LocalSigner::new(keypair.clone()));
let duties_map = duties_map.clone();
let slot_clock = slot_clock.clone();
let log = log.clone();
let client = Arc::new(BeaconBlockGrpcClient::new(beacon_block_grpc_client.clone()));
thread::spawn(move || {
let block_producer =
BlockProducer::new(spec, duties_map, slot_clock, client, signer);
let mut block_producer_service = BlockProducerService {
block_producer,
poll_interval_millis,
log,
};
block_producer_service.run();
})
};
threads.push((duties_manager_thread, producer_thread));
}
// Naively wait for all the threads to complete.
for tuple in threads {
let (manager, producer) = tuple;
let _ = producer.join();
let _ = manager.join();
}
}