Merge master and remove ssz length encoding from FakeBLS

This commit is contained in:
Kirk Baird
2019-03-29 11:47:22 +11:00
69 changed files with 4163 additions and 741 deletions

View File

@@ -1,7 +1,7 @@
[package]
name = "validator_client"
version = "0.1.0"
authors = ["Paul Hauner <paul@paulhauner.com>"]
authors = ["Paul Hauner <paul@paulhauner.com>", "Age Manning <Age@AgeManning.com>", "Luke Anderson <luke@lukeanderson.com.au>"]
edition = "2018"
[[bin]]
@@ -15,7 +15,9 @@ path = "src/lib.rs"
[dependencies]
block_proposer = { path = "../eth2/block_proposer" }
attester = { path = "../eth2/attester" }
bls = { path = "../eth2/utils/bls" }
ssz = { path = "../eth2/utils/ssz" }
clap = "2.32.0"
dirs = "1.0.3"
grpcio = { version = "0.4", default-features = false, features = ["protobuf-codec"] }
@@ -26,5 +28,8 @@ types = { path = "../eth2/types" }
slog = "^2.2.3"
slog-term = "^2.4.0"
slog-async = "^2.3.0"
ssz = { path = "../eth2/utils/ssz" }
tokio = "0.1.18"
tokio-timer = "0.2.10"
error-chain = "0.12.0"
bincode = "^1.1.2"
futures = "0.1.25"

View File

@@ -0,0 +1,44 @@
use protos::services_grpc::AttestationServiceClient;
use std::sync::Arc;
use attester::{BeaconNode, BeaconNodeError, PublishOutcome};
use protos::services::ProduceAttestationRequest;
use types::{AttestationData, FreeAttestation, Slot};
pub struct AttestationGrpcClient {
client: Arc<AttestationServiceClient>,
}
impl AttestationGrpcClient {
pub fn new(client: Arc<AttestationServiceClient>) -> Self {
Self { client }
}
}
impl BeaconNode for AttestationGrpcClient {
fn produce_attestation(
&self,
slot: Slot,
shard: u64,
) -> Result<Option<AttestationData>, BeaconNodeError> {
let mut req = ProduceAttestationRequest::new();
req.set_slot(slot.as_u64());
req.set_shard(shard);
let reply = self
.client
.produce_attestation(&req)
.map_err(|err| BeaconNodeError::RemoteFailure(format!("{:?}", err)))?;
// TODO: return correct Attestation
Err(BeaconNodeError::DecodeFailure)
}
fn publish_attestation(
&self,
free_attestation: FreeAttestation,
) -> Result<PublishOutcome, BeaconNodeError> {
// TODO: return correct PublishOutcome
Err(BeaconNodeError::DecodeFailure)
}
}

View File

@@ -0,0 +1,54 @@
mod attestation_grpc_client;
use attester::{Attester, BeaconNode, DutiesReader, PollOutcome as AttesterPollOutcome, Signer};
use slog::{error, info, warn, Logger};
use slot_clock::SlotClock;
use std::time::Duration;
pub use self::attestation_grpc_client::AttestationGrpcClient;
pub struct AttesterService<T: SlotClock, U: BeaconNode, V: DutiesReader, W: Signer> {
pub attester: Attester<T, U, V, W>,
pub poll_interval_millis: u64,
pub log: Logger,
}
impl<T: SlotClock, U: BeaconNode, V: DutiesReader, W: Signer> AttesterService<T, U, V, W> {
/// Run a loop which polls the Attester each `poll_interval_millis` millseconds.
///
/// Logs the results of the polls.
pub fn run(&mut self) {
loop {
match self.attester.poll() {
Err(error) => {
error!(self.log, "Attester poll error"; "error" => format!("{:?}", error))
}
Ok(AttesterPollOutcome::AttestationProduced(slot)) => {
info!(self.log, "Produced Attestation"; "slot" => slot)
}
Ok(AttesterPollOutcome::SlashableAttestationNotProduced(slot)) => {
warn!(self.log, "Slashable attestation was not produced"; "slot" => slot)
}
Ok(AttesterPollOutcome::AttestationNotRequired(slot)) => {
info!(self.log, "Attestation not required"; "slot" => slot)
}
Ok(AttesterPollOutcome::ProducerDutiesUnknown(slot)) => {
error!(self.log, "Attestation duties unknown"; "slot" => slot)
}
Ok(AttesterPollOutcome::SlotAlreadyProcessed(slot)) => {
warn!(self.log, "Attempted to re-process slot"; "slot" => slot)
}
Ok(AttesterPollOutcome::BeaconNodeUnableToProduceAttestation(slot)) => {
error!(self.log, "Beacon node unable to produce attestation"; "slot" => slot)
}
Ok(AttesterPollOutcome::SignerRejection(slot)) => {
error!(self.log, "The cryptographic signer refused to sign the attestation"; "slot" => slot)
}
Ok(AttesterPollOutcome::ValidatorIsUnknown(slot)) => {
error!(self.log, "The Beacon Node does not recognise the validator"; "slot" => slot)
}
};
std::thread::sleep(Duration::from_millis(self.poll_interval_millis));
}
}
}

View File

@@ -5,7 +5,7 @@ use protos::services::{
use protos::services_grpc::BeaconBlockServiceClient;
use ssz::{decode, ssz_encode};
use std::sync::Arc;
use types::{BeaconBlock, BeaconBlockBody, Eth1Data, Hash256, Signature, Slot};
use types::{BeaconBlock, Signature, Slot};
/// A newtype designed to wrap the gRPC-generated service so the `BeaconNode` trait may be
/// implemented upon it.
@@ -40,33 +40,11 @@ impl BeaconNode for BeaconBlockGrpcClient {
if reply.has_block() {
let block = reply.get_block();
let ssz = block.get_ssz();
let signature = decode::<Signature>(block.get_signature())
.map_err(|_| BeaconNodeError::DecodeFailure)?;
let block = decode::<BeaconBlock>(&ssz).map_err(|_| BeaconNodeError::DecodeFailure)?;
let randao_reveal = decode::<Signature>(block.get_randao_reveal())
.map_err(|_| BeaconNodeError::DecodeFailure)?;
// TODO: this conversion is incomplete; fix it.
Ok(Some(BeaconBlock {
slot: Slot::new(block.get_slot()),
previous_block_root: Hash256::zero(),
state_root: Hash256::zero(),
signature,
body: BeaconBlockBody {
randao_reveal,
eth1_data: Eth1Data {
deposit_root: Hash256::zero(),
block_hash: Hash256::zero(),
},
proposer_slashings: vec![],
attester_slashings: vec![],
attestations: vec![],
deposits: vec![],
voluntary_exits: vec![],
transfers: vec![],
},
}))
Ok(Some(block))
} else {
Ok(None)
}
@@ -79,12 +57,11 @@ impl BeaconNode for BeaconBlockGrpcClient {
fn publish_beacon_block(&self, block: BeaconBlock) -> Result<PublishOutcome, BeaconNodeError> {
let mut req = PublishBeaconBlockRequest::new();
let ssz = ssz_encode(&block);
// TODO: this conversion is incomplete; fix it.
let mut grpc_block = GrpcBeaconBlock::new();
grpc_block.set_slot(block.slot.as_u64());
grpc_block.set_block_root(vec![0]);
grpc_block.set_randao_reveal(ssz_encode(&block.body.randao_reveal));
grpc_block.set_signature(ssz_encode(&block.signature));
grpc_block.set_ssz(ssz);
req.set_block(grpc_block);

View File

@@ -22,15 +22,14 @@ pub struct Config {
const DEFAULT_PRIVATE_KEY_FILENAME: &str = "private.key";
impl Default for Config {
/// Build a new configuration from defaults.
fn default() -> Self {
let data_dir = {
let home = dirs::home_dir().expect("Unable to determine home directory.");
home.join(".lighthouse-validator")
};
fs::create_dir_all(&data_dir)
.unwrap_or_else(|_| panic!("Unable to create {:?}", &data_dir));
let server = "localhost:50051".to_string();
let server = "localhost:5051".to_string();
let spec = ChainSpec::foundation();
@@ -50,13 +49,14 @@ impl Config {
// Use the specified datadir, or default in the home directory
if let Some(datadir) = args.value_of("datadir") {
config.data_dir = PathBuf::from(datadir);
fs::create_dir_all(&config.data_dir)
.unwrap_or_else(|_| panic!("Unable to create {:?}", &config.data_dir));
info!(log, "Using custom data dir: {:?}", &config.data_dir);
};
fs::create_dir_all(&config.data_dir)
.unwrap_or_else(|_| panic!("Unable to create {:?}", &config.data_dir));
if let Some(srv) = args.value_of("server") {
//TODO: I don't think this parses correctly a server & port combo
//TODO: Validate the server value, to ensure it makes sense.
config.server = srv.to_string();
info!(log, "Using custom server: {:?}", &config.server);
};
@@ -67,10 +67,15 @@ impl Config {
config.spec = match spec_str {
"foundation" => ChainSpec::foundation(),
"few_validators" => ChainSpec::few_validators(),
"lighthouse_testnet" => ChainSpec::lighthouse_testnet(),
// Should be impossible due to clap's `possible_values(..)` function.
_ => unreachable!(),
};
};
// Log configuration
info!(log, "";
"data_dir" => &config.data_dir.to_str(),
"server" => &config.server);
Ok(config)
}

View File

@@ -1,90 +1,123 @@
use block_proposer::{DutiesReader, DutiesReaderError};
use std::collections::HashMap;
use std::sync::RwLock;
use types::{Epoch, Fork, Slot};
use std::ops::{Deref, DerefMut};
use types::{AttestationDuty, Epoch, PublicKey, Slot};
/// When work needs to be performed by a validator, this type is given back to the main service
/// which indicates all the information that required to process the work.
///
/// Note: This is calculated per slot, so a validator knows which slot is related to this struct.
#[derive(Debug, Clone)]
pub struct WorkInfo {
/// Validator needs to produce a block.
pub produce_block: bool,
/// Validator needs to produce an attestation. This supplies the required attestation data.
pub attestation_duty: Option<AttestationDuty>,
}
/// The information required for a validator to propose and attest during some epoch.
///
/// Generally obtained from a Beacon Node, this information contains the validators canonical index
/// (thier sequence in the global validator induction process) and the "shuffling" for that index
/// (their sequence in the global validator induction process) and the "shuffling" for that index
/// for some epoch.
#[derive(Debug, PartialEq, Clone, Copy, Default)]
pub struct EpochDuties {
pub validator_index: u64,
pub struct EpochDuty {
pub block_production_slot: Option<Slot>,
// Future shard info
pub attestation_slot: Slot,
pub attestation_shard: u64,
pub committee_index: u64,
}
impl EpochDuties {
/// Returns `true` if the supplied `slot` is a slot in which the validator should produce a
/// block.
pub fn is_block_production_slot(&self, slot: Slot) -> bool {
match self.block_production_slot {
impl EpochDuty {
/// Returns `WorkInfo` if work needs to be done in the supplied `slot`
pub fn is_work_slot(&self, slot: Slot) -> Option<WorkInfo> {
// if validator is required to produce a slot return true
let produce_block = match self.block_production_slot {
Some(s) if s == slot => true,
_ => false,
};
// if the validator is required to attest to a shard, create the data
let mut attestation_duty = None;
if self.attestation_slot == slot {
attestation_duty = Some(AttestationDuty {
slot,
shard: self.attestation_shard,
committee_index: self.committee_index as usize,
});
}
if produce_block | attestation_duty.is_some() {
return Some(WorkInfo {
produce_block,
attestation_duty,
});
}
None
}
}
/// Maps a list of public keys (many validators) to an EpochDuty.
pub type EpochDuties = HashMap<PublicKey, Option<EpochDuty>>;
pub enum EpochDutiesMapError {
Poisoned,
UnknownEpoch,
UnknownValidator,
}
/// Maps an `epoch` to some `EpochDuties` for a single validator.
pub struct EpochDutiesMap {
pub slots_per_epoch: u64,
pub map: RwLock<HashMap<Epoch, EpochDuties>>,
pub map: HashMap<Epoch, EpochDuties>,
}
impl EpochDutiesMap {
pub fn new(slots_per_epoch: u64) -> Self {
Self {
slots_per_epoch,
map: RwLock::new(HashMap::new()),
map: HashMap::new(),
}
}
pub fn get(&self, epoch: Epoch) -> Result<Option<EpochDuties>, EpochDutiesMapError> {
let map = self.map.read().map_err(|_| EpochDutiesMapError::Poisoned)?;
match map.get(&epoch) {
Some(duties) => Ok(Some(*duties)),
None => Ok(None),
}
}
pub fn insert(
&self,
epoch: Epoch,
epoch_duties: EpochDuties,
) -> Result<Option<EpochDuties>, EpochDutiesMapError> {
let mut map = self
.map
.write()
.map_err(|_| EpochDutiesMapError::Poisoned)?;
Ok(map.insert(epoch, epoch_duties))
}
}
impl DutiesReader for EpochDutiesMap {
fn is_block_production_slot(&self, slot: Slot) -> Result<bool, DutiesReaderError> {
// Expose the hashmap methods
impl Deref for EpochDutiesMap {
type Target = HashMap<Epoch, EpochDuties>;
fn deref(&self) -> &Self::Target {
&self.map
}
}
impl DerefMut for EpochDutiesMap {
fn deref_mut(&mut self) -> &mut HashMap<Epoch, EpochDuties> {
&mut self.map
}
}
impl EpochDutiesMap {
/// Checks if the validator has work to do.
pub fn is_work_slot(
&self,
slot: Slot,
pubkey: &PublicKey,
) -> Result<Option<WorkInfo>, EpochDutiesMapError> {
let epoch = slot.epoch(self.slots_per_epoch);
let map = self.map.read().map_err(|_| DutiesReaderError::Poisoned)?;
let duties = map
let epoch_duties = self
.map
.get(&epoch)
.ok_or_else(|| DutiesReaderError::UnknownEpoch)?;
Ok(duties.is_block_production_slot(slot))
}
fn fork(&self) -> Result<Fork, DutiesReaderError> {
// TODO: this is garbage data.
//
// It will almost certainly cause signatures to fail verification.
Ok(Fork {
previous_version: [0; 4],
current_version: [0; 4],
epoch: Epoch::new(0),
})
.ok_or_else(|| EpochDutiesMapError::UnknownEpoch)?;
if let Some(epoch_duty) = epoch_duties.get(pubkey) {
if let Some(duty) = epoch_duty {
// Retrieves the duty for a validator at a given slot
return Ok(duty.is_work_slot(slot));
} else {
// the validator isn't active
return Ok(None);
}
} else {
// validator isn't known
return Err(EpochDutiesMapError::UnknownValidator);
}
}
}

View File

@@ -1,54 +1,60 @@
use super::epoch_duties::{EpochDuties, EpochDuty};
use super::traits::{BeaconNode, BeaconNodeError};
use super::EpochDuties;
use protos::services::{ProposeBlockSlotRequest, PublicKey as IndexRequest};
use grpcio::CallOption;
use protos::services::{GetDutiesRequest, Validators};
use protos::services_grpc::ValidatorServiceClient;
use ssz::ssz_encode;
use std::collections::HashMap;
use std::time::Duration;
use types::{Epoch, PublicKey, Slot};
impl BeaconNode for ValidatorServiceClient {
/// Request the shuffling from the Beacon Node (BN).
///
/// As this function takes a `PublicKey`, it will first attempt to resolve the public key into
/// a validator index, then call the BN for production/attestation duties.
///
/// Note: presently only block production information is returned.
fn request_shuffling(
/// Requests all duties (block signing and committee attesting) from the Beacon Node (BN).
fn request_duties(
&self,
epoch: Epoch,
public_key: &PublicKey,
) -> Result<Option<EpochDuties>, BeaconNodeError> {
// Lookup the validator index for the supplied public key.
let validator_index = {
let mut req = IndexRequest::new();
req.set_public_key(ssz_encode(public_key).to_vec());
let resp = self
.validator_index(&req)
.map_err(|err| BeaconNodeError::RemoteFailure(format!("{:?}", err)))?;
resp.get_index()
};
let mut req = ProposeBlockSlotRequest::new();
req.set_validator_index(validator_index);
pubkeys: &[PublicKey],
) -> Result<EpochDuties, BeaconNodeError> {
// Get the required duties from all validators
// build the request
let mut req = GetDutiesRequest::new();
req.set_epoch(epoch.as_u64());
let mut validators = Validators::new();
validators.set_public_keys(pubkeys.iter().map(|v| ssz_encode(v)).collect());
req.set_validators(validators);
// set a timeout for requests
// let call_opt = CallOption::default().timeout(Duration::from_secs(2));
// send the request, get the duties reply
let reply = self
.propose_block_slot(&req)
.get_validator_duties(&req)
.map_err(|err| BeaconNodeError::RemoteFailure(format!("{:?}", err)))?;
let block_production_slot = if reply.has_slot() {
Some(reply.get_slot())
} else {
None
};
let block_production_slot = match block_production_slot {
Some(slot) => Some(Slot::new(slot)),
None => None,
};
Ok(Some(EpochDuties {
validator_index,
block_production_slot,
}))
let mut epoch_duties: HashMap<PublicKey, Option<EpochDuty>> = HashMap::new();
for (index, validator_duty) in reply.get_active_validators().iter().enumerate() {
if !validator_duty.has_duty() {
// validator is inactive
epoch_duties.insert(pubkeys[index].clone(), None);
continue;
}
// active validator
let active_duty = validator_duty.get_duty();
let block_production_slot = {
if active_duty.has_block_production_slot() {
Some(Slot::from(active_duty.get_block_production_slot()))
} else {
None
}
};
let epoch_duty = EpochDuty {
block_production_slot,
attestation_slot: Slot::from(active_duty.get_attestation_slot()),
attestation_shard: active_duty.get_attestation_shard(),
committee_index: active_duty.get_committee_index(),
};
epoch_duties.insert(pubkeys[index].clone(), Some(epoch_duty));
}
Ok(epoch_duties)
}
}

View File

@@ -1,21 +1,21 @@
mod epoch_duties;
mod grpc;
mod service;
#[cfg(test)]
mod test_node;
// TODO: reintroduce tests
//#[cfg(test)]
//mod test_node;
mod traits;
pub use self::epoch_duties::EpochDutiesMap;
use self::epoch_duties::{EpochDuties, EpochDutiesMapError};
pub use self::service::DutiesManagerService;
pub use self::epoch_duties::{EpochDutiesMap, WorkInfo};
use self::traits::{BeaconNode, BeaconNodeError};
use bls::PublicKey;
use slot_clock::SlotClock;
use futures::Async;
use slog::{debug, error, info};
use std::sync::Arc;
use types::{ChainSpec, Epoch};
use std::sync::RwLock;
use types::{Epoch, PublicKey, Slot};
#[derive(Debug, PartialEq, Clone, Copy)]
pub enum PollOutcome {
#[derive(Debug, PartialEq, Clone)]
pub enum UpdateOutcome {
/// The `EpochDuties` were not updated during this poll.
NoChange(Epoch),
/// The `EpochDuties` for the `epoch` were previously unknown, but obtained in the poll.
@@ -23,79 +23,117 @@ pub enum PollOutcome {
/// New `EpochDuties` were obtained, different to those which were previously known. This is
/// likely to be the result of chain re-organisation.
DutiesChanged(Epoch, EpochDuties),
/// The Beacon Node was unable to return the duties as the validator is unknown, or the
/// shuffling for the epoch is unknown.
UnknownValidatorOrEpoch(Epoch),
}
#[derive(Debug, PartialEq)]
pub enum Error {
SlotClockError,
SlotUnknowable,
DutiesMapPoisoned,
EpochMapPoisoned,
BeaconNodeError(BeaconNodeError),
UnknownEpoch,
UnknownValidator,
}
/// A polling state machine which ensures the latest `EpochDuties` are obtained from the Beacon
/// Node.
///
/// There is a single `DutiesManager` per validator instance.
pub struct DutiesManager<T: SlotClock, U: BeaconNode> {
pub duties_map: Arc<EpochDutiesMap>,
/// The validator's public key.
pub pubkey: PublicKey,
pub spec: Arc<ChainSpec>,
pub slot_clock: Arc<T>,
/// This keeps track of all validator keys and required voting slots.
pub struct DutiesManager<U: BeaconNode> {
pub duties_map: RwLock<EpochDutiesMap>,
/// A list of all public keys known to the validator service.
pub pubkeys: Vec<PublicKey>,
pub beacon_node: Arc<U>,
}
impl<T: SlotClock, U: BeaconNode> DutiesManager<T, U> {
/// Poll the Beacon Node for `EpochDuties`.
impl<U: BeaconNode> DutiesManager<U> {
/// Check the Beacon Node for `EpochDuties`.
///
/// The present `epoch` will be learned from the supplied `SlotClock`. In production this will
/// be a wall-clock (e.g., system time, remote server time, etc.).
pub fn poll(&self) -> Result<PollOutcome, Error> {
let slot = self
.slot_clock
.present_slot()
.map_err(|_| Error::SlotClockError)?
.ok_or(Error::SlotUnknowable)?;
let epoch = slot.epoch(self.spec.slots_per_epoch);
if let Some(duties) = self.beacon_node.request_shuffling(epoch, &self.pubkey)? {
fn update(&self, epoch: Epoch) -> Result<UpdateOutcome, Error> {
let duties = self.beacon_node.request_duties(epoch, &self.pubkeys)?;
{
// If these duties were known, check to see if they're updates or identical.
let result = if let Some(known_duties) = self.duties_map.get(epoch)? {
if known_duties == duties {
Ok(PollOutcome::NoChange(epoch))
} else {
Ok(PollOutcome::DutiesChanged(epoch, duties))
if let Some(known_duties) = self.duties_map.read()?.get(&epoch) {
if *known_duties == duties {
return Ok(UpdateOutcome::NoChange(epoch));
}
} else {
Ok(PollOutcome::NewDuties(epoch, duties))
};
self.duties_map.insert(epoch, duties)?;
result
} else {
Ok(PollOutcome::UnknownValidatorOrEpoch(epoch))
}
}
if !self.duties_map.read()?.contains_key(&epoch) {
//TODO: Remove clone by removing duties from outcome
self.duties_map.write()?.insert(epoch, duties.clone());
return Ok(UpdateOutcome::NewDuties(epoch, duties));
}
// duties have changed
//TODO: Duties could be large here. Remove from display and avoid the clone.
self.duties_map.write()?.insert(epoch, duties.clone());
return Ok(UpdateOutcome::DutiesChanged(epoch, duties));
}
/// A future wrapping around `update()`. This will perform logic based upon the update
/// process and complete once the update has completed.
pub fn run_update(&self, epoch: Epoch, log: slog::Logger) -> Result<Async<()>, ()> {
match self.update(epoch) {
Err(error) => error!(log, "Epoch duties poll error"; "error" => format!("{:?}", error)),
Ok(UpdateOutcome::NoChange(epoch)) => {
debug!(log, "No change in duties"; "epoch" => epoch)
}
Ok(UpdateOutcome::DutiesChanged(epoch, duties)) => {
info!(log, "Duties changed (potential re-org)"; "epoch" => epoch, "duties" => format!("{:?}", duties))
}
Ok(UpdateOutcome::NewDuties(epoch, duties)) => {
info!(log, "New duties obtained"; "epoch" => epoch, "duties" => format!("{:?}", duties))
}
};
Ok(Async::Ready(()))
}
/// Returns a list of (Public, WorkInfo) indicating all the validators that have work to perform
/// this slot.
pub fn get_current_work(&self, slot: Slot) -> Option<Vec<(PublicKey, WorkInfo)>> {
let mut current_work: Vec<(PublicKey, WorkInfo)> = Vec::new();
// if the map is poisoned, return None
let duties = self.duties_map.read().ok()?;
for validator_pk in &self.pubkeys {
match duties.is_work_slot(slot, &validator_pk) {
Ok(Some(work_type)) => current_work.push((validator_pk.clone(), work_type)),
Ok(None) => {} // No work for this validator
Err(_) => {} // Unknown epoch or validator, no work
}
}
if current_work.is_empty() {
return None;
}
Some(current_work)
}
}
//TODO: Use error_chain to handle errors
impl From<BeaconNodeError> for Error {
fn from(e: BeaconNodeError) -> Error {
Error::BeaconNodeError(e)
}
}
//TODO: Use error_chain to handle errors
impl<T> From<std::sync::PoisonError<T>> for Error {
fn from(_e: std::sync::PoisonError<T>) -> Error {
Error::DutiesMapPoisoned
}
}
impl From<EpochDutiesMapError> for Error {
fn from(e: EpochDutiesMapError) -> Error {
match e {
EpochDutiesMapError::Poisoned => Error::EpochMapPoisoned,
EpochDutiesMapError::UnknownEpoch => Error::UnknownEpoch,
EpochDutiesMapError::UnknownValidator => Error::UnknownValidator,
}
}
}
/* TODO: Modify tests for new Duties Manager form
#[cfg(test)]
mod tests {
use super::test_node::TestBeaconNode;
@@ -109,6 +147,7 @@ mod tests {
//
// These tests should serve as a good example for future tests.
#[test]
pub fn polling() {
let spec = Arc::new(ChainSpec::foundation());
@@ -159,3 +198,4 @@ mod tests {
);
}
}
*/

View File

@@ -1,40 +0,0 @@
use super::traits::BeaconNode;
use super::{DutiesManager, PollOutcome};
use slog::{debug, error, info, Logger};
use slot_clock::SlotClock;
use std::time::Duration;
pub struct DutiesManagerService<T: SlotClock, U: BeaconNode> {
pub manager: DutiesManager<T, U>,
pub poll_interval_millis: u64,
pub log: Logger,
}
impl<T: SlotClock, U: BeaconNode> DutiesManagerService<T, U> {
/// Run a loop which polls the manager each `poll_interval_millis` milliseconds.
///
/// Logs the results of the polls.
pub fn run(&mut self) {
loop {
match self.manager.poll() {
Err(error) => {
error!(self.log, "Epoch duties poll error"; "error" => format!("{:?}", error))
}
Ok(PollOutcome::NoChange(epoch)) => {
debug!(self.log, "No change in duties"; "epoch" => epoch)
}
Ok(PollOutcome::DutiesChanged(epoch, duties)) => {
info!(self.log, "Duties changed (potential re-org)"; "epoch" => epoch, "duties" => format!("{:?}", duties))
}
Ok(PollOutcome::NewDuties(epoch, duties)) => {
info!(self.log, "New duties obtained"; "epoch" => epoch, "duties" => format!("{:?}", duties))
}
Ok(PollOutcome::UnknownValidatorOrEpoch(epoch)) => {
error!(self.log, "Epoch or validator unknown"; "epoch" => epoch)
}
};
std::thread::sleep(Duration::from_millis(self.poll_interval_millis));
}
}
}

View File

@@ -1,6 +1,5 @@
use super::EpochDuties;
use bls::PublicKey;
use types::Epoch;
use types::{Epoch, PublicKey};
#[derive(Debug, PartialEq, Clone)]
pub enum BeaconNodeError {
@@ -9,12 +8,13 @@ pub enum BeaconNodeError {
/// Defines the methods required to obtain a validators shuffling from a Beacon Node.
pub trait BeaconNode: Send + Sync {
/// Get the shuffling for the given epoch and public key.
/// Gets the duties for all validators.
///
/// Returns Ok(None) if the public key is unknown, or the shuffling for that epoch is unknown.
fn request_shuffling(
/// Returns a vector of EpochDuties for each validator public key. The entry will be None for
/// validators that are not activated.
fn request_duties(
&self,
epoch: Epoch,
public_key: &PublicKey,
) -> Result<Option<EpochDuties>, BeaconNodeError>;
pubkeys: &[PublicKey],
) -> Result<EpochDuties, BeaconNodeError>;
}

View File

@@ -0,0 +1,22 @@
use slot_clock;
use error_chain::{
error_chain, error_chain_processing, impl_error_chain_kind, impl_error_chain_processed,
impl_extract_backtrace,
};
error_chain! {
links { }
errors {
SlotClockError(e: slot_clock::SystemTimeSlotClockError) {
description("Error reading system time"),
display("SlotClockError: '{:?}'", e)
}
SystemTimeError(t: String ) {
description("Error reading system time"),
display("SystemTimeError: '{}'", t)
}
}
}

View File

@@ -1,18 +1,14 @@
use self::block_producer_service::{BeaconBlockGrpcClient, BlockProducerService};
use self::duties::{DutiesManager, DutiesManagerService, EpochDutiesMap};
use crate::config::Config;
use block_proposer::{test_utils::LocalSigner, BlockProducer};
use clap::{App, Arg};
use grpcio::{ChannelBuilder, EnvBuilder};
use protos::services_grpc::{BeaconBlockServiceClient, ValidatorServiceClient};
use slog::{info, o, Drain};
use slot_clock::SystemTimeSlotClock;
use std::sync::Arc;
use std::thread;
mod attester_service;
mod block_producer_service;
mod config;
mod duties;
pub mod error;
mod service;
use crate::config::Config as ValidatorClientConfig;
use clap::{App, Arg};
use service::Service as ValidatorService;
use slog::{error, info, o, Drain};
fn main() {
// Logging
@@ -47,116 +43,17 @@ fn main() {
.short("s")
.help("Configuration of Beacon Chain")
.takes_value(true)
.possible_values(&["foundation", "few_validators"])
.default_value("foundation"),
.possible_values(&["foundation", "few_validators", "lighthouse_testnet"])
.default_value("lighthouse_testnet"),
)
.get_matches();
let config = Config::parse_args(&matches, &log)
let config = ValidatorClientConfig::parse_args(&matches, &log)
.expect("Unable to build a configuration for the validator client.");
// Log configuration
info!(log, "Configuration parameters:";
"data_dir" => &config.data_dir.to_str(),
"server" => &config.server);
// Beacon node gRPC beacon block endpoints.
let beacon_block_grpc_client = {
let env = Arc::new(EnvBuilder::new().build());
let ch = ChannelBuilder::new(env).connect(&config.server);
Arc::new(BeaconBlockServiceClient::new(ch))
};
// Beacon node gRPC validator endpoints.
let validator_grpc_client = {
let env = Arc::new(EnvBuilder::new().build());
let ch = ChannelBuilder::new(env).connect(&config.server);
Arc::new(ValidatorServiceClient::new(ch))
};
// Spec
let spec = Arc::new(config.spec.clone());
// Clock for determining the present slot.
// TODO: this shouldn't be a static time, instead it should be pulled from the beacon node.
// https://github.com/sigp/lighthouse/issues/160
let genesis_time = 1_549_935_547;
let slot_clock = {
info!(log, "Genesis time"; "unix_epoch_seconds" => genesis_time);
let clock = SystemTimeSlotClock::new(genesis_time, spec.seconds_per_slot)
.expect("Unable to instantiate SystemTimeSlotClock.");
Arc::new(clock)
};
let poll_interval_millis = spec.seconds_per_slot * 1000 / 10; // 10% epoch time precision.
info!(log, "Starting block producer service"; "polls_per_epoch" => spec.seconds_per_slot * 1000 / poll_interval_millis);
let keypairs = config.fetch_keys(&log)
.expect("No key pairs found in configuration, they must first be generated with: account_manager generate.");
/*
* Start threads.
*/
let mut threads = vec![];
for keypair in keypairs {
info!(log, "Starting validator services"; "validator" => keypair.pk.concatenated_hex_id());
let duties_map = Arc::new(EpochDutiesMap::new(spec.slots_per_epoch));
// Spawn a new thread to maintain the validator's `EpochDuties`.
let duties_manager_thread = {
let spec = spec.clone();
let duties_map = duties_map.clone();
let slot_clock = slot_clock.clone();
let log = log.clone();
let beacon_node = validator_grpc_client.clone();
let pubkey = keypair.pk.clone();
thread::spawn(move || {
let manager = DutiesManager {
duties_map,
pubkey,
spec,
slot_clock,
beacon_node,
};
let mut duties_manager_service = DutiesManagerService {
manager,
poll_interval_millis,
log,
};
duties_manager_service.run();
})
};
// Spawn a new thread to perform block production for the validator.
let producer_thread = {
let spec = spec.clone();
let signer = Arc::new(LocalSigner::new(keypair.clone()));
let duties_map = duties_map.clone();
let slot_clock = slot_clock.clone();
let log = log.clone();
let client = Arc::new(BeaconBlockGrpcClient::new(beacon_block_grpc_client.clone()));
thread::spawn(move || {
let block_producer =
BlockProducer::new(spec, duties_map, slot_clock, client, signer);
let mut block_producer_service = BlockProducerService {
block_producer,
poll_interval_millis,
log,
};
block_producer_service.run();
})
};
threads.push((duties_manager_thread, producer_thread));
}
// Naively wait for all the threads to complete.
for tuple in threads {
let (manager, producer) = tuple;
let _ = producer.join();
let _ = manager.join();
// start the validator service.
match ValidatorService::start(config, log.clone()) {
Ok(_) => info!(log, "Validator client shutdown successfully."),
Err(e) => error!(log, "Validator exited due to: {}", e.to_string()),
}
}

View File

@@ -0,0 +1,328 @@
/// The validator service. Connects to a beacon node and signs blocks when required.
use crate::attester_service::{AttestationGrpcClient, AttesterService};
use crate::block_producer_service::{BeaconBlockGrpcClient, BlockProducerService};
use crate::config::Config as ValidatorConfig;
use crate::duties::UpdateOutcome;
use crate::duties::{DutiesManager, EpochDutiesMap};
use crate::error as error_chain;
use crate::error::ErrorKind;
use attester::test_utils::EpochMap;
use attester::{test_utils::LocalSigner as AttesterLocalSigner, Attester};
use block_proposer::{test_utils::LocalSigner as BlockProposerLocalSigner, BlockProducer};
use bls::Keypair;
use grpcio::{ChannelBuilder, EnvBuilder};
use protos::services::Empty;
use protos::services_grpc::{
AttestationServiceClient, BeaconBlockServiceClient, BeaconNodeServiceClient,
ValidatorServiceClient,
};
use slog::{debug, error, info, warn};
use slot_clock::{SlotClock, SystemTimeSlotClock};
use std::sync::Arc;
use std::sync::RwLock;
use std::time::{Duration, Instant, SystemTime};
use tokio::prelude::*;
use tokio::runtime::Builder;
use tokio::timer::Interval;
use tokio_timer::clock::Clock;
use types::test_utils::generate_deterministic_keypairs;
use types::{Epoch, Fork, Slot};
//TODO: This service should be simplified in the future. Can be made more steamlined.
/// The validator service. This is the main thread that executes and maintains validator
/// duties.
pub struct Service {
/// The node we currently connected to.
connected_node_version: String,
/// The chain id we are processing on.
chain_id: u16,
/// The fork state we processing on.
fork: Fork,
/// The slot clock for this service.
slot_clock: SystemTimeSlotClock,
/// The current slot we are processing.
current_slot: Slot,
/// The number of slots per epoch to allow for converting slots to epochs.
slots_per_epoch: u64,
// GRPC Clients
/// The beacon block GRPC client.
beacon_block_client: Arc<BeaconBlockServiceClient>,
/// The validator GRPC client.
validator_client: Arc<ValidatorServiceClient>,
/// The attester GRPC client.
attester_client: Arc<AttestationServiceClient>,
/// The validator client logger.
log: slog::Logger,
}
impl Service {
/// Initial connection to the beacon node to determine its properties.
///
/// This tries to connect to a beacon node. Once connected, it initialised the gRPC clients
/// and returns an instance of the service.
fn initialize_service(
config: &ValidatorConfig,
log: slog::Logger,
) -> error_chain::Result<Self> {
// initialise the beacon node client to check for a connection
let env = Arc::new(EnvBuilder::new().build());
// Beacon node gRPC beacon node endpoints.
let beacon_node_client = {
let ch = ChannelBuilder::new(env.clone()).connect(&config.server);
Arc::new(BeaconNodeServiceClient::new(ch))
};
// retrieve node information
let node_info = loop {
match beacon_node_client.info(&Empty::new()) {
Err(e) => {
warn!(log, "Could not connect to node. Error: {}", e);
info!(log, "Retrying in 5 seconds...");
std::thread::sleep(Duration::from_secs(5));
continue;
}
Ok(info) => {
if SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_secs()
< info.genesis_time
{
warn!(
log,
"Beacon Node's genesis time is in the future. No work to do.\n Exiting"
);
return Err("Genesis time in the future".into());
}
break info;
}
};
};
// build requisite objects to form Self
let genesis_time = node_info.get_genesis_time();
let genesis_slot = Slot::from(node_info.get_genesis_slot());
info!(log,"Beacon node connected"; "Node Version" => node_info.version.clone(), "Chain ID" => node_info.chain_id, "Genesis time" => genesis_time);
let proto_fork = node_info.get_fork();
let mut previous_version: [u8; 4] = [0; 4];
let mut current_version: [u8; 4] = [0; 4];
previous_version.copy_from_slice(&proto_fork.get_previous_version()[..4]);
current_version.copy_from_slice(&proto_fork.get_current_version()[..4]);
let fork = Fork {
previous_version,
current_version,
epoch: Epoch::from(proto_fork.get_epoch()),
};
// initialize the RPC clients
// Beacon node gRPC beacon block endpoints.
let beacon_block_client = {
let ch = ChannelBuilder::new(env.clone()).connect(&config.server);
Arc::new(BeaconBlockServiceClient::new(ch))
};
// Beacon node gRPC validator endpoints.
let validator_client = {
let ch = ChannelBuilder::new(env.clone()).connect(&config.server);
Arc::new(ValidatorServiceClient::new(ch))
};
//Beacon node gRPC attester endpoints.
let attester_client = {
let ch = ChannelBuilder::new(env.clone()).connect(&config.server);
Arc::new(AttestationServiceClient::new(ch))
};
// build the validator slot clock
let slot_clock =
SystemTimeSlotClock::new(genesis_slot, genesis_time, config.spec.seconds_per_slot)
.expect("Unable to instantiate SystemTimeSlotClock.");
let current_slot = slot_clock
.present_slot()
.map_err(|e| ErrorKind::SlotClockError(e))?
.expect("Genesis must be in the future");
Ok(Self {
connected_node_version: node_info.version,
chain_id: node_info.chain_id as u16,
fork,
slot_clock,
current_slot,
slots_per_epoch: config.spec.slots_per_epoch,
beacon_block_client,
validator_client,
attester_client,
log,
})
}
/// Initialise the service then run the core thread.
pub fn start(config: ValidatorConfig, log: slog::Logger) -> error_chain::Result<()> {
// connect to the node and retrieve its properties and initialize the gRPC clients
let service = Service::initialize_service(&config, log)?;
// we have connected to a node and established its parameters. Spin up the core service
// set up the validator service runtime
let mut runtime = Builder::new()
.clock(Clock::system())
.name_prefix("validator-client-")
.build()
.map_err(|e| format!("Tokio runtime failed: {}", e))?;
let duration_to_next_slot = service
.slot_clock
.duration_to_next_slot()
.map_err(|e| format!("System clock error: {:?}", e))?
.expect("Cannot start before genesis");
// set up the validator work interval - start at next slot and proceed every slot
let interval = {
// Set the interval to start at the next slot, and every slot after
let slot_duration = Duration::from_secs(config.spec.seconds_per_slot);
//TODO: Handle checked add correctly
Interval::new(Instant::now() + duration_to_next_slot, slot_duration)
};
/* kick off core service */
// generate keypairs
// TODO: keypairs are randomly generated; they should be loaded from a file or generated.
// https://github.com/sigp/lighthouse/issues/160
let keypairs = Arc::new(generate_deterministic_keypairs(8));
/* build requisite objects to pass to core thread */
// Builds a mapping of Epoch -> Map(PublicKey, EpochDuty)
// where EpochDuty contains slot numbers and attestation data that each validator needs to
// produce work on.
let duties_map = RwLock::new(EpochDutiesMap::new(config.spec.slots_per_epoch));
// builds a manager which maintains the list of current duties for all known validators
// and can check when a validator needs to perform a task.
let manager = Arc::new(DutiesManager {
duties_map,
pubkeys: keypairs.iter().map(|keypair| keypair.pk.clone()).collect(),
beacon_node: service.validator_client.clone(),
});
// run the core thread
runtime.block_on(
interval
.for_each(move |_| {
let log = service.log.clone();
/* get the current slot and epoch */
let current_slot = match service.slot_clock.present_slot() {
Err(e) => {
error!(log, "SystemTimeError {:?}", e);
return Ok(());
}
Ok(slot) => slot.expect("Genesis is in the future"),
};
let current_epoch = current_slot.epoch(service.slots_per_epoch);
debug_assert!(
current_slot > service.current_slot,
"The Timer should poll a new slot"
);
info!(log, "Processing slot: {}", current_slot.as_u64());
/* check for new duties */
let cloned_manager = manager.clone();
let cloned_log = log.clone();
// spawn a new thread separate to the runtime
std::thread::spawn(move || {
cloned_manager.run_update(current_epoch.clone(), cloned_log.clone());
dbg!("Finished thread");
});
/* execute any specified duties */
if let Some(work) = manager.get_current_work(current_slot) {
for (_public_key, work_type) in work {
if work_type.produce_block {
// TODO: Produce a beacon block in a new thread
}
if work_type.attestation_duty.is_some() {
// available AttestationDuty info
let attestation_duty =
work_type.attestation_duty.expect("Cannot be None");
//TODO: Produce an attestation in a new thread
}
}
}
Ok(())
})
.map_err(|e| format!("Service thread failed: {:?}", e)),
);
// completed a slot process
Ok(())
}
/*
// Spawn a new thread to perform block production for the validator.
let producer_thread = {
let spec = spec.clone();
let signer = Arc::new(BlockProposerLocalSigner::new(keypair.clone()));
let duties_map = duties_map.clone();
let slot_clock = slot_clock.clone();
let log = log.clone();
let client = Arc::new(BeaconBlockGrpcClient::new(beacon_block_grpc_client.clone()));
thread::spawn(move || {
let block_producer =
BlockProducer::new(spec, duties_map, slot_clock, client, signer);
let mut block_producer_service = BlockProducerService {
block_producer,
poll_interval_millis,
log,
};
block_producer_service.run();
})
};
// Spawn a new thread for attestation for the validator.
let attester_thread = {
let signer = Arc::new(AttesterLocalSigner::new(keypair.clone()));
let epoch_map = epoch_map_for_attester.clone();
let slot_clock = slot_clock.clone();
let log = log.clone();
let client = Arc::new(AttestationGrpcClient::new(attester_grpc_client.clone()));
thread::spawn(move || {
let attester = Attester::new(epoch_map, slot_clock, client, signer);
let mut attester_service = AttesterService {
attester,
poll_interval_millis,
log,
};
attester_service.run();
})
};
threads.push((duties_manager_thread, producer_thread, attester_thread));
}
// Naively wait for all the threads to complete.
for tuple in threads {
let (manager, producer, attester) = tuple;
let _ = producer.join();
let _ = manager.join();
let _ = attester.join();
}
*/
}