diff --git a/.travis.yml b/.travis.yml index 8518c5d450..d43d21a005 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,8 +1,15 @@ language: rust +before_install: + - curl -OL https://github.com/google/protobuf/releases/download/v3.4.0/protoc-3.4.0-linux-x86_64.zip + - unzip protoc-3.4.0-linux-x86_64.zip -d protoc3 + - sudo mv protoc3/bin/* /usr/local/bin/ + - sudo mv protoc3/include/* /usr/local/include/ + - sudo chown $USER /usr/local/bin/protoc + - sudo chown -R $USER /usr/local/include/google script: - - cargo fmt --all -- --check - cargo build --verbose --all - cargo test --verbose --all + - cargo fmt --all -- --check rust: - stable - beta diff --git a/Cargo.toml b/Cargo.toml index a12317daa5..1ca11dcbb6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,36 +1,3 @@ -[package] -name = "lighthouse" -version = "0.0.1" -authors = ["Paul Hauner "] -edition = "2018" - -[dependencies] -blake2-rfc = "0.2.18" -bls-aggregates = { git = "https://github.com/sigp/signature-schemes" } -bytes = "" -crypto-mac = "^0.6.2" -clap = "2.32.0" -db = { path = "lighthouse/db" } -dirs = "1.0.3" -futures = "0.1.23" -rand = "0.3" -rlp = { git = "https://github.com/paritytech/parity-common" } -slog = "^2.2.3" -slog-term = "^2.4.0" -slog-async = "^2.3.0" -tokio = "0.1" - -[dependencies.pairing] -git = "https://github.com/mmaker/pairing" -branch = "feature/hashing" - -[patch.crates-io] -ring = { git = "https://github.com/paritytech/ring" } - -[[bin]] -path = "lighthouse/main.rs" -name = "lighthouse" - [workspace] members = [ "beacon_chain/attestation_validation", @@ -47,6 +14,8 @@ members = [ "beacon_chain/utils/vec_shuffle", "beacon_chain/validator_induction", "beacon_chain/validator_shuffling", - "lighthouse/beacon_chain", - "lighthouse/db", + "beacon_node", + "beacon_node/db", + "protos", + "validator_client", ] diff --git a/README.md b/README.md index 01ebf68538..9ed5acecc5 100644 --- a/README.md +++ b/README.md @@ -113,6 +113,10 @@ A few basic steps are needed to get set up: 3. Use the command `rustup show` to get information about the Rust installation. You should see that the active toolchain is the stable version. 4. Run `rustc --version` to check the installation and version of rust. - Updates can be performed using` rustup update` . + 5. Install build dependancies (Arch packages are listed here, your distribution will + likely be similar): + - `clang`: required by RocksDB. + - `protobuf`: required for protobuf serialization (gRPC). 5. Navigate to the working directory. 6. Run the test by using command `cargo test --all` . By running, it will pass all the required test cases. If you are doing it for the first time, then you can grab a coffee meantime. Usually, it takes time to build, compile and pass all test cases. If there is no error then, it means everything is working properly and it's time to get hand's dirty. In case, if there is an error, then please raise the [issue](https://github.com/sigp/lighthouse/issues). We will help you. 7. As an alternative to, or instead of the above step, you may also run benchmarks by using the command `cargo bench --all` diff --git a/beacon_chain/attestation_validation/Cargo.toml b/beacon_chain/attestation_validation/Cargo.toml index bc357a26a2..b944f90f95 100644 --- a/beacon_chain/attestation_validation/Cargo.toml +++ b/beacon_chain/attestation_validation/Cargo.toml @@ -6,7 +6,7 @@ edition = "2018" [dependencies] bls = { path = "../utils/bls" } -db = { path = "../../lighthouse/db" } +db = { path = "../../beacon_node/db" } hashing = { path = "../utils/hashing" } ssz = { path = "../utils/ssz" } types = { path = "../types" } diff --git a/beacon_chain/naive_fork_choice/Cargo.toml b/beacon_chain/naive_fork_choice/Cargo.toml index 9508ed02e3..5b3a1b8d22 100644 --- a/beacon_chain/naive_fork_choice/Cargo.toml +++ b/beacon_chain/naive_fork_choice/Cargo.toml @@ -5,6 +5,6 @@ authors = ["Paul Hauner "] edition = "2018" [dependencies] -db = { path = "../../lighthouse/db" } +db = { path = "../../beacon_node/db" } ssz = { path = "../utils/ssz" } types = { path = "../types" } diff --git a/beacon_chain/types/src/lib.rs b/beacon_chain/types/src/lib.rs index b89cbc50f6..f00c19d037 100644 --- a/beacon_chain/types/src/lib.rs +++ b/beacon_chain/types/src/lib.rs @@ -64,3 +64,5 @@ pub type AttesterMap = HashMap<(u64, u64), Vec>; /// Maps a slot to a block proposer. pub type ProposerMap = HashMap; + +pub use bls::{AggregatePublicKey, AggregateSignature, PublicKey, Signature}; diff --git a/beacon_chain/utils/bls/Cargo.toml b/beacon_chain/utils/bls/Cargo.toml index 9e782b0597..0dc64e483d 100644 --- a/beacon_chain/utils/bls/Cargo.toml +++ b/beacon_chain/utils/bls/Cargo.toml @@ -7,4 +7,5 @@ edition = "2018" [dependencies] bls-aggregates = { git = "https://github.com/sigp/signature-schemes" } hashing = { path = "../hashing" } +hex = "0.3" ssz = { path = "../ssz" } diff --git a/beacon_chain/utils/bls/src/aggregate_signature.rs b/beacon_chain/utils/bls/src/aggregate_signature.rs index 90bf44702b..6012f78c12 100644 --- a/beacon_chain/utils/bls/src/aggregate_signature.rs +++ b/beacon_chain/utils/bls/src/aggregate_signature.rs @@ -6,7 +6,7 @@ use bls_aggregates::AggregateSignature as RawAggregateSignature; /// /// This struct is a wrapper upon a base type and provides helper functions (e.g., SSZ /// serialization). -#[derive(Debug, PartialEq, Clone, Default)] +#[derive(Debug, PartialEq, Clone, Default, Eq)] pub struct AggregateSignature(RawAggregateSignature); impl AggregateSignature { diff --git a/beacon_chain/utils/bls/src/keypair.rs b/beacon_chain/utils/bls/src/keypair.rs index 51a61f8f7d..1cce9c10e7 100644 --- a/beacon_chain/utils/bls/src/keypair.rs +++ b/beacon_chain/utils/bls/src/keypair.rs @@ -1,6 +1,6 @@ use super::{PublicKey, SecretKey}; -#[derive(Debug, Clone, PartialEq)] +#[derive(Debug, Clone, PartialEq, Eq)] pub struct Keypair { pub sk: SecretKey, pub pk: PublicKey, diff --git a/beacon_chain/utils/bls/src/public_key.rs b/beacon_chain/utils/bls/src/public_key.rs index e7950969e7..4a53893f03 100644 --- a/beacon_chain/utils/bls/src/public_key.rs +++ b/beacon_chain/utils/bls/src/public_key.rs @@ -1,13 +1,15 @@ use super::SecretKey; use bls_aggregates::PublicKey as RawPublicKey; -use ssz::{decode_ssz_list, Decodable, DecodeError, Encodable, SszStream}; +use hex::encode as hex_encode; +use ssz::{decode_ssz_list, ssz_encode, Decodable, DecodeError, Encodable, SszStream}; use std::default; +use std::hash::{Hash, Hasher}; /// A single BLS signature. /// /// This struct is a wrapper upon a base type and provides helper functions (e.g., SSZ /// serialization). -#[derive(Debug, PartialEq, Clone)] +#[derive(Debug, Clone, Eq)] pub struct PublicKey(RawPublicKey); impl PublicKey { @@ -19,6 +21,15 @@ impl PublicKey { pub fn as_raw(&self) -> &RawPublicKey { &self.0 } + + /// Returns the last 6 bytes of the SSZ encoding of the public key, as a hex string. + /// + /// Useful for providing a short identifier to the user. + pub fn concatenated_hex_id(&self) -> String { + let bytes = ssz_encode(self); + let end_bytes = &bytes[bytes.len().saturating_sub(6)..bytes.len()]; + hex_encode(end_bytes) + } } impl default::Default for PublicKey { @@ -42,6 +53,18 @@ impl Decodable for PublicKey { } } +impl PartialEq for PublicKey { + fn eq(&self, other: &PublicKey) -> bool { + ssz_encode(self) == ssz_encode(other) + } +} + +impl Hash for PublicKey { + fn hash(&self, state: &mut H) { + ssz_encode(self).hash(state) + } +} + #[cfg(test)] mod tests { use super::super::ssz::ssz_encode; diff --git a/beacon_chain/utils/bls/src/secret_key.rs b/beacon_chain/utils/bls/src/secret_key.rs index 7c4383695b..e86e8dec0f 100644 --- a/beacon_chain/utils/bls/src/secret_key.rs +++ b/beacon_chain/utils/bls/src/secret_key.rs @@ -5,7 +5,7 @@ use ssz::{decode_ssz_list, Decodable, DecodeError, Encodable, SszStream}; /// /// This struct is a wrapper upon a base type and provides helper functions (e.g., SSZ /// serialization). -#[derive(Debug, PartialEq, Clone)] +#[derive(Debug, PartialEq, Clone, Eq)] pub struct SecretKey(RawSecretKey); impl SecretKey { diff --git a/beacon_chain/utils/bls/src/signature.rs b/beacon_chain/utils/bls/src/signature.rs index 07fc2baff5..7f9e617181 100644 --- a/beacon_chain/utils/bls/src/signature.rs +++ b/beacon_chain/utils/bls/src/signature.rs @@ -6,7 +6,7 @@ use bls_aggregates::Signature as RawSignature; /// /// This struct is a wrapper upon a base type and provides helper functions (e.g., SSZ /// serialization). -#[derive(Debug, PartialEq, Clone)] +#[derive(Debug, PartialEq, Clone, Eq)] pub struct Signature(RawSignature); impl Signature { diff --git a/beacon_chain/utils/slot_clock/src/lib.rs b/beacon_chain/utils/slot_clock/src/lib.rs index 7bdb775afe..4863f76698 100644 --- a/beacon_chain/utils/slot_clock/src/lib.rs +++ b/beacon_chain/utils/slot_clock/src/lib.rs @@ -4,7 +4,7 @@ mod testing_slot_clock; pub use crate::system_time_slot_clock::{Error as SystemTimeSlotClockError, SystemTimeSlotClock}; pub use crate::testing_slot_clock::{Error as TestingSlotClockError, TestingSlotClock}; -pub trait SlotClock { +pub trait SlotClock: Send + Sync { type Error; fn present_slot(&self) -> Result, Self::Error>; diff --git a/beacon_node/Cargo.toml b/beacon_node/Cargo.toml new file mode 100644 index 0000000000..ca78a47677 --- /dev/null +++ b/beacon_node/Cargo.toml @@ -0,0 +1,20 @@ +[package] +name = "beacon_node" +version = "0.1.0" +authors = ["Paul Hauner "] +edition = "2018" + +[dependencies] +bls = { path = "../beacon_chain/utils/bls" } +grpcio = { version = "0.4", default-features = false, features = ["protobuf-codec"] } +protobuf = "2.0.2" +protos = { path = "../protos" } +clap = "2.32.0" +db = { path = "db" } +dirs = "1.0.3" +futures = "0.1.23" +slog = "^2.2.3" +slog-term = "^2.4.0" +slog-async = "^2.3.0" +ssz = { path = "../beacon_chain/utils/ssz" } +tokio = "0.1" diff --git a/lighthouse/db/Cargo.toml b/beacon_node/db/Cargo.toml similarity index 100% rename from lighthouse/db/Cargo.toml rename to beacon_node/db/Cargo.toml diff --git a/lighthouse/db/src/disk_db.rs b/beacon_node/db/src/disk_db.rs similarity index 100% rename from lighthouse/db/src/disk_db.rs rename to beacon_node/db/src/disk_db.rs diff --git a/lighthouse/db/src/lib.rs b/beacon_node/db/src/lib.rs similarity index 100% rename from lighthouse/db/src/lib.rs rename to beacon_node/db/src/lib.rs diff --git a/lighthouse/db/src/memory_db.rs b/beacon_node/db/src/memory_db.rs similarity index 100% rename from lighthouse/db/src/memory_db.rs rename to beacon_node/db/src/memory_db.rs diff --git a/lighthouse/db/src/stores/beacon_block_store.rs b/beacon_node/db/src/stores/beacon_block_store.rs similarity index 100% rename from lighthouse/db/src/stores/beacon_block_store.rs rename to beacon_node/db/src/stores/beacon_block_store.rs diff --git a/lighthouse/db/src/stores/beacon_state_store.rs b/beacon_node/db/src/stores/beacon_state_store.rs similarity index 100% rename from lighthouse/db/src/stores/beacon_state_store.rs rename to beacon_node/db/src/stores/beacon_state_store.rs diff --git a/lighthouse/db/src/stores/macros.rs b/beacon_node/db/src/stores/macros.rs similarity index 100% rename from lighthouse/db/src/stores/macros.rs rename to beacon_node/db/src/stores/macros.rs diff --git a/lighthouse/db/src/stores/mod.rs b/beacon_node/db/src/stores/mod.rs similarity index 100% rename from lighthouse/db/src/stores/mod.rs rename to beacon_node/db/src/stores/mod.rs diff --git a/lighthouse/db/src/stores/pow_chain_store.rs b/beacon_node/db/src/stores/pow_chain_store.rs similarity index 100% rename from lighthouse/db/src/stores/pow_chain_store.rs rename to beacon_node/db/src/stores/pow_chain_store.rs diff --git a/lighthouse/db/src/stores/validator_store.rs b/beacon_node/db/src/stores/validator_store.rs similarity index 100% rename from lighthouse/db/src/stores/validator_store.rs rename to beacon_node/db/src/stores/validator_store.rs diff --git a/lighthouse/db/src/traits.rs b/beacon_node/db/src/traits.rs similarity index 100% rename from lighthouse/db/src/traits.rs rename to beacon_node/db/src/traits.rs diff --git a/lighthouse/beacon_chain/src/block_processing.rs b/beacon_node/src/beacon_chain/block_processing.rs similarity index 100% rename from lighthouse/beacon_chain/src/block_processing.rs rename to beacon_node/src/beacon_chain/block_processing.rs diff --git a/lighthouse/beacon_chain/src/block_production.rs b/beacon_node/src/beacon_chain/block_production.rs similarity index 100% rename from lighthouse/beacon_chain/src/block_production.rs rename to beacon_node/src/beacon_chain/block_production.rs diff --git a/lighthouse/beacon_chain/src/lib.rs b/beacon_node/src/beacon_chain/mod.rs similarity index 100% rename from lighthouse/beacon_chain/src/lib.rs rename to beacon_node/src/beacon_chain/mod.rs diff --git a/lighthouse/beacon_chain/tests/chain_test.rs b/beacon_node/src/beacon_chain/tests/chain_test.rs similarity index 100% rename from lighthouse/beacon_chain/tests/chain_test.rs rename to beacon_node/src/beacon_chain/tests/chain_test.rs diff --git a/lighthouse/beacon_chain/src/transition.rs b/beacon_node/src/beacon_chain/transition.rs similarity index 100% rename from lighthouse/beacon_chain/src/transition.rs rename to beacon_node/src/beacon_chain/transition.rs diff --git a/lighthouse/config/mod.rs b/beacon_node/src/config/mod.rs similarity index 97% rename from lighthouse/config/mod.rs rename to beacon_node/src/config/mod.rs index e22b62fb96..5c94e300c7 100644 --- a/lighthouse/config/mod.rs +++ b/beacon_node/src/config/mod.rs @@ -1,5 +1,3 @@ -extern crate dirs; - use std::fs; use std::path::PathBuf; diff --git a/lighthouse/main.rs b/beacon_node/src/main.rs similarity index 84% rename from lighthouse/main.rs rename to beacon_node/src/main.rs index 3102feaf79..08660cbc4b 100644 --- a/lighthouse/main.rs +++ b/beacon_node/src/main.rs @@ -1,20 +1,14 @@ -#[macro_use] extern crate slog; -extern crate slog_async; -extern crate slog_term; -// extern crate ssz; -extern crate clap; -extern crate futures; - -extern crate db; mod config; +mod rpc; use std::path::PathBuf; use crate::config::LighthouseConfig; +use crate::rpc::start_server; use clap::{App, Arg}; -use slog::Drain; +use slog::{error, info, o, Drain}; fn main() { let decorator = slog_term::TermDecorator::new().build(); @@ -64,10 +58,9 @@ fn main() { "data_dir" => &config.data_dir.to_str(), "port" => &config.p2p_listen_port); - error!( - log, - "Lighthouse under development and does not provide a user demo." - ); + let _server = start_server(log.clone()); - info!(log, "Exiting."); + loop { + std::thread::sleep(std::time::Duration::from_secs(1)); + } } diff --git a/beacon_node/src/rpc/beacon_block.rs b/beacon_node/src/rpc/beacon_block.rs new file mode 100644 index 0000000000..a047365efa --- /dev/null +++ b/beacon_node/src/rpc/beacon_block.rs @@ -0,0 +1,57 @@ +use futures::Future; +use grpcio::{RpcContext, UnarySink}; +use protos::services::{ + BeaconBlock as BeaconBlockProto, ProduceBeaconBlockRequest, ProduceBeaconBlockResponse, + PublishBeaconBlockRequest, PublishBeaconBlockResponse, +}; +use protos::services_grpc::BeaconBlockService; +use slog::Logger; + +#[derive(Clone)] +pub struct BeaconBlockServiceInstance { + pub log: Logger, +} + +impl BeaconBlockService for BeaconBlockServiceInstance { + /// Produce a `BeaconBlock` for signing by a validator. + fn produce_beacon_block( + &mut self, + ctx: RpcContext, + req: ProduceBeaconBlockRequest, + sink: UnarySink, + ) { + println!("producing at slot {}", req.get_slot()); + + // TODO: build a legit block. + let mut block = BeaconBlockProto::new(); + block.set_slot(req.get_slot()); + block.set_block_root("cats".as_bytes().to_vec()); + + let mut resp = ProduceBeaconBlockResponse::new(); + resp.set_block(block); + + let f = sink + .success(resp) + .map_err(move |e| println!("failed to reply {:?}: {:?}", req, e)); + ctx.spawn(f) + } + + /// Accept some fully-formed `BeaconBlock`, process and publish it. + fn publish_beacon_block( + &mut self, + ctx: RpcContext, + req: PublishBeaconBlockRequest, + sink: UnarySink, + ) { + println!("publishing {:?}", req.get_block()); + + // TODO: actually process the block. + let mut resp = PublishBeaconBlockResponse::new(); + resp.set_success(true); + + let f = sink + .success(resp) + .map_err(move |e| println!("failed to reply {:?}: {:?}", req, e)); + ctx.spawn(f) + } +} diff --git a/beacon_node/src/rpc/mod.rs b/beacon_node/src/rpc/mod.rs new file mode 100644 index 0000000000..6a18a4aa88 --- /dev/null +++ b/beacon_node/src/rpc/mod.rs @@ -0,0 +1,36 @@ +mod beacon_block; +mod validator; + +use self::beacon_block::BeaconBlockServiceInstance; +use self::validator::ValidatorServiceInstance; +use grpcio::{Environment, Server, ServerBuilder}; +use protos::services_grpc::{create_beacon_block_service, create_validator_service}; +use std::sync::Arc; + +use slog::{info, Logger}; + +pub fn start_server(log: Logger) -> Server { + let log_clone = log.clone(); + let env = Arc::new(Environment::new(1)); + + let beacon_block_service = { + let instance = BeaconBlockServiceInstance { log: log.clone() }; + create_beacon_block_service(instance) + }; + let validator_service = { + let instance = ValidatorServiceInstance { log: log.clone() }; + create_validator_service(instance) + }; + + let mut server = ServerBuilder::new(env) + .register_service(beacon_block_service) + .register_service(validator_service) + .bind("127.0.0.1", 50_051) + .build() + .unwrap(); + server.start(); + for &(ref host, port) in server.bind_addrs() { + info!(log_clone, "gRPC listening on {}:{}", host, port); + } + server +} diff --git a/beacon_node/src/rpc/validator.rs b/beacon_node/src/rpc/validator.rs new file mode 100644 index 0000000000..f894deca6b --- /dev/null +++ b/beacon_node/src/rpc/validator.rs @@ -0,0 +1,64 @@ +use bls::PublicKey; +use futures::Future; +use grpcio::{RpcContext, RpcStatus, RpcStatusCode, UnarySink}; +use protos::services::{ + IndexResponse, ProposeBlockSlotRequest, ProposeBlockSlotResponse, PublicKey as PublicKeyRequest, +}; +use protos::services_grpc::ValidatorService; +use slog::{debug, Logger}; +use ssz::Decodable; + +#[derive(Clone)] +pub struct ValidatorServiceInstance { + pub log: Logger, +} + +impl ValidatorService for ValidatorServiceInstance { + fn validator_index( + &mut self, + ctx: RpcContext, + req: PublicKeyRequest, + sink: UnarySink, + ) { + if let Ok((public_key, _)) = PublicKey::ssz_decode(req.get_public_key(), 0) { + debug!(self.log, "RPC request"; "endpoint" => "ValidatorIndex", "public_key" => public_key.concatenated_hex_id()); + + let mut resp = IndexResponse::new(); + + // TODO: return a legit value. + resp.set_index(1); + + let f = sink + .success(resp) + .map_err(move |e| println!("failed to reply {:?}: {:?}", req, e)); + ctx.spawn(f) + } else { + let f = sink + .fail(RpcStatus::new( + RpcStatusCode::InvalidArgument, + Some("Invalid public_key".to_string()), + )) + .map_err(move |e| println!("failed to reply {:?}: {:?}", req, e)); + ctx.spawn(f) + } + } + + fn propose_block_slot( + &mut self, + ctx: RpcContext, + req: ProposeBlockSlotRequest, + sink: UnarySink, + ) { + debug!(self.log, "RPC request"; "endpoint" => "ProposeBlockSlot", "epoch" => req.get_epoch(), "validator_index" => req.get_validator_index()); + + let mut resp = ProposeBlockSlotResponse::new(); + + // TODO: return a legit value. + resp.set_slot(1); + + let f = sink + .success(resp) + .map_err(move |e| println!("failed to reply {:?}: {:?}", req, e)); + ctx.spawn(f) + } +} diff --git a/lighthouse/beacon_chain/Cargo.toml b/lighthouse/beacon_chain/Cargo.toml deleted file mode 100644 index 5e27949773..0000000000 --- a/lighthouse/beacon_chain/Cargo.toml +++ /dev/null @@ -1,17 +0,0 @@ -[package] -name = "chain" -version = "0.1.0" -authors = ["Paul Hauner "] -edition = "2018" - -[dependencies] -bls = { path = "../../beacon_chain/utils/bls" } -db = { path = "../db" } -genesis = { path = "../../beacon_chain/genesis" } -naive_fork_choice = { path = "../../beacon_chain/naive_fork_choice" } -slot_clock = { path = "../../beacon_chain/utils/slot_clock" } -spec = { path = "../../beacon_chain/spec" } -ssz = { path = "../../beacon_chain/utils/ssz" } -types = { path = "../../beacon_chain/types" } -validator_induction = { path = "../../beacon_chain/validator_induction" } -validator_shuffling = { path = "../../beacon_chain/validator_shuffling" } diff --git a/protos/.gitignore b/protos/.gitignore new file mode 100644 index 0000000000..7104339d9b --- /dev/null +++ b/protos/.gitignore @@ -0,0 +1,2 @@ +src/services.rs +src/services_grpc.rs diff --git a/protos/Cargo.toml b/protos/Cargo.toml new file mode 100644 index 0000000000..56364d188b --- /dev/null +++ b/protos/Cargo.toml @@ -0,0 +1,14 @@ +[package] +name = "protos" +version = "0.1.0" +authors = ["Paul Hauner "] +edition = "2018" +description = "Google protobuf message and service definitions used in Lighthouse APIs." + +[dependencies] +futures = "0.1" +grpcio = { version = "0.4", default-features = false, features = ["protobuf-codec"] } +protobuf = "2.0" + +[build-dependencies] +protoc-grpcio = "0.3.1" diff --git a/protos/build.rs b/protos/build.rs new file mode 100644 index 0000000000..d8becfa519 --- /dev/null +++ b/protos/build.rs @@ -0,0 +1,8 @@ +extern crate protoc_grpcio; + +fn main() { + let proto_root = "src/"; + println!("cargo:rerun-if-changed={}", proto_root); + protoc_grpcio::compile_grpc_protos(&["services.proto"], &[proto_root], &proto_root) + .expect("Failed to compile gRPC definitions!"); +} diff --git a/protos/src/lib.rs b/protos/src/lib.rs new file mode 100644 index 0000000000..a9684909d6 --- /dev/null +++ b/protos/src/lib.rs @@ -0,0 +1,2 @@ +pub mod services; +pub mod services_grpc; diff --git a/protos/src/services.proto b/protos/src/services.proto new file mode 100644 index 0000000000..16e2d4dba7 --- /dev/null +++ b/protos/src/services.proto @@ -0,0 +1,94 @@ +// TODO: This setup requires that the BN (beacon node) holds the block in state +// during the interval between the `GenerateProposalRequest` and the +// `SubmitProposalRequest`. +// +// This is sub-optimal as if a validator client switches BN during this process +// the block will be lost. +// +// This "stateful" method is being used presently because it's easier and +// requires less maintainence as the `BeaconBlock` definition changes. + +syntax = "proto3"; + +package ethereum.beacon.rpc.v1; + +service BeaconBlockService { + rpc ProduceBeaconBlock(ProduceBeaconBlockRequest) returns (ProduceBeaconBlockResponse); + rpc PublishBeaconBlock(PublishBeaconBlockRequest) returns (PublishBeaconBlockResponse); +} + +service ValidatorService { + // rpc ValidatorAssignment(ValidatorAssignmentRequest) returns (ValidatorAssignmentResponse); + rpc ProposeBlockSlot(ProposeBlockSlotRequest) returns (ProposeBlockSlotResponse); + rpc ValidatorIndex(PublicKey) returns (IndexResponse); +} + +message BeaconBlock { + uint64 slot = 1; + bytes block_root = 2; + bytes randao_reveal = 3; + bytes signature = 4; +} + +// Validator requests an unsigned proposal. +message ProduceBeaconBlockRequest { + uint64 slot = 1; +} + +// Beacon node returns an unsigned proposal. +message ProduceBeaconBlockResponse { + BeaconBlock block = 1; +} + +// Validator submits a signed proposal. +message PublishBeaconBlockRequest { + BeaconBlock block = 1; +} + +// Beacon node indicates a sucessfully submitted proposal. +message PublishBeaconBlockResponse { + bool success = 1; + bytes msg = 2; +} + +// A validators duties for some epoch. +// TODO: add shard duties. +message ValidatorAssignment { + oneof block_production_slot_oneof { + bool block_production_slot_none = 1; + uint64 block_production_slot = 2; + } +} + +message ValidatorAssignmentRequest { + uint64 epoch = 1; + bytes validator_index = 2; +} + +/* + * Propose slot + */ + +message ProposeBlockSlotRequest { + uint64 epoch = 1; + uint64 validator_index = 2; +} + +message ProposeBlockSlotResponse { + oneof slot_oneof { + bool none = 1; + uint64 slot = 2; + } +} + +/* + * Validator Assignment + */ + +message PublicKey { + bytes public_key = 1; +} + +message IndexResponse { + uint64 index = 1; +} diff --git a/validator_client/Cargo.toml b/validator_client/Cargo.toml new file mode 100644 index 0000000000..05a9a640fe --- /dev/null +++ b/validator_client/Cargo.toml @@ -0,0 +1,20 @@ +[package] +name = "validator_client" +version = "0.1.0" +authors = ["Paul Hauner "] +edition = "2018" + +[dependencies] +bls = { path = "../beacon_chain/utils/bls" } +clap = "2.32.0" +dirs = "1.0.3" +grpcio = { version = "0.4", default-features = false, features = ["protobuf-codec"] } +protobuf = "2.0.2" +protos = { path = "../protos" } +slot_clock = { path = "../beacon_chain/utils/slot_clock" } +spec = { path = "../beacon_chain/spec" } +types = { path = "../beacon_chain/types" } +slog = "^2.2.3" +slog-term = "^2.4.0" +slog-async = "^2.3.0" +ssz = { path = "../beacon_chain/utils/ssz" } diff --git a/validator_client/README.md b/validator_client/README.md new file mode 100644 index 0000000000..aa84fe0139 --- /dev/null +++ b/validator_client/README.md @@ -0,0 +1,67 @@ +# Lighthouse Validator Client + +The Validator Client (VC) is a stand-alone binary which connects to a Beacon +Node (BN) and fulfils the roles of a validator. + +## Roles + +The VC is responsible for the following tasks: + +- Requesting validator duties (a.k.a. shuffling) from the BN. +- Prompting the BN to produce a new block, when a validators block production + duties require. +- Completing all the fields on a new block (e.g., RANDAO reveal, signature) and + publishing the block to a BN. +- Prompting the BN to produce a new shard atteststation as per a validators + duties. +- Ensuring that no slashable messages are signed by a validator private key. +- Keeping track of the system clock and how it relates to slots/epochs. + +The VC is capable of managing multiple validators in the same process tree. + +## Implementation + +_This section describes the present implementation of this VC binary._ + +### Services + +Each validator is represented by two services, one which tracks the validator +duties and another which performs block production duties. + +A separate thread is maintained for each service, for each validator. As such, +a single validator utilises three (3) threads (one for the base VC and two for +each service) and two validators utilise five (5) threads. + +#### `DutiesManagerService` + +Polls a BN and requests validator responsibilities, as well as a validator +index. The outcome of a successful poll is a `EpochDuties` struct: + +```rust +EpochDuties { + validator_index: u64, + block_prodcution_slot: u64, +} +``` + +This is stored in the `EpochDutiesMap`, a `HashMap` mapping `epoch -> +EpochDuties`. + +#### `BlockProducerService` + +Polls the system clock and determines if a block needs to be produced. Reads +from the `EpochDutiesMap` maintained by the `DutiesManagerService`. + +If block production is required, performs all the necessary duties to request, +complete and return a block from the BN. + +### Configuration + +Presently the validator specifics (pubkey, etc.) are randomly generated and the +chain specification (slot length, BLS domain, etc.) are fixed to foundation +parameters. This is temporary and will be upgrade so these parameters can be +read from file (or initialized on first-boot). + +## BN Communication + +The VC communicates with the BN via a gRPC/protobuf connection. diff --git a/validator_client/src/block_producer/grpc.rs b/validator_client/src/block_producer/grpc.rs new file mode 100644 index 0000000000..033965b169 --- /dev/null +++ b/validator_client/src/block_producer/grpc.rs @@ -0,0 +1,74 @@ +use super::traits::{BeaconNode, BeaconNodeError}; +use protos::services::{ + BeaconBlock as GrpcBeaconBlock, ProduceBeaconBlockRequest, PublishBeaconBlockRequest, +}; +use protos::services_grpc::BeaconBlockServiceClient; +use ssz::{ssz_encode, Decodable}; +use types::{BeaconBlock, BeaconBlockBody, Hash256, Signature}; + +impl BeaconNode for BeaconBlockServiceClient { + /// Request a Beacon Node (BN) to produce a new block at the supplied slot. + /// + /// Returns `None` if it is not possible to produce at the supplied slot. For example, if the + /// BN is unable to find a parent block. + fn produce_beacon_block(&self, slot: u64) -> Result, BeaconNodeError> { + let mut req = ProduceBeaconBlockRequest::new(); + req.set_slot(slot); + + let reply = self + .produce_beacon_block(&req) + .map_err(|err| BeaconNodeError::RemoteFailure(format!("{:?}", err)))?; + + if reply.has_block() { + let block = reply.get_block(); + + let (signature, _) = Signature::ssz_decode(block.get_signature(), 0) + .map_err(|_| BeaconNodeError::DecodeFailure)?; + + // TODO: this conversion is incomplete; fix it. + Ok(Some(BeaconBlock { + slot: block.get_slot(), + parent_root: Hash256::zero(), + state_root: Hash256::zero(), + randao_reveal: Hash256::from(block.get_randao_reveal()), + candidate_pow_receipt_root: Hash256::zero(), + signature, + body: BeaconBlockBody { + proposer_slashings: vec![], + casper_slashings: vec![], + attestations: vec![], + custody_reseeds: vec![], + custody_challenges: vec![], + custody_responses: vec![], + deposits: vec![], + exits: vec![], + }, + })) + } else { + Ok(None) + } + } + + /// Request a Beacon Node (BN) to publish a block. + /// + /// Generally, this will be called after a `produce_beacon_block` call with a block that has + /// been completed (signed) by the validator client. + fn publish_beacon_block(&self, block: BeaconBlock) -> Result { + let mut req = PublishBeaconBlockRequest::new(); + + // TODO: this conversion is incomplete; fix it. + let mut grpc_block = GrpcBeaconBlock::new(); + grpc_block.set_slot(block.slot); + grpc_block.set_block_root(vec![0]); + grpc_block.set_randao_reveal(block.randao_reveal.to_vec()); + grpc_block.set_signature(ssz_encode(&block.signature)); + + req.set_block(grpc_block); + + let reply = self + .publish_beacon_block(&req) + .map_err(|err| BeaconNodeError::RemoteFailure(format!("{:?}", err)))?; + + Ok(reply.get_success()) + } +} diff --git a/validator_client/src/block_producer/mod.rs b/validator_client/src/block_producer/mod.rs new file mode 100644 index 0000000000..f3cd0199bb --- /dev/null +++ b/validator_client/src/block_producer/mod.rs @@ -0,0 +1,254 @@ +mod grpc; +mod service; +#[cfg(test)] +mod test_node; +mod traits; + +use self::traits::{BeaconNode, BeaconNodeError}; +use super::EpochDutiesMap; +use slot_clock::SlotClock; +use spec::ChainSpec; +use std::sync::{Arc, RwLock}; +use types::BeaconBlock; + +pub use self::service::BlockProducerService; + +#[derive(Debug, PartialEq)] +pub enum PollOutcome { + /// A new block was produced. + BlockProduced(u64), + /// A block was not produced as it would have been slashable. + SlashableBlockNotProduced(u64), + /// The validator duties did not require a block to be produced. + BlockProductionNotRequired(u64), + /// The duties for the present epoch were not found. + ProducerDutiesUnknown(u64), + /// The slot has already been processed, execution was skipped. + SlotAlreadyProcessed(u64), + /// The Beacon Node was unable to produce a block at that slot. + BeaconNodeUnableToProduceBlock(u64), +} + +#[derive(Debug, PartialEq)] +pub enum Error { + SlotClockError, + SlotUnknowable, + EpochMapPoisoned, + SlotClockPoisoned, + EpochLengthIsZero, + BeaconNodeError(BeaconNodeError), +} + +/// A polling state machine which performs block production duties, based upon some epoch duties +/// (`EpochDutiesMap`) and a concept of time (`SlotClock`). +/// +/// Ensures that messages are not slashable. +/// +/// Relies upon an external service to keep the `EpochDutiesMap` updated. +pub struct BlockProducer { + pub last_processed_slot: u64, + spec: Arc, + epoch_map: Arc>, + slot_clock: Arc>, + beacon_node: Arc, +} + +impl BlockProducer { + /// Returns a new instance where `last_processed_slot == 0`. + pub fn new( + spec: Arc, + epoch_map: Arc>, + slot_clock: Arc>, + beacon_node: Arc, + ) -> Self { + Self { + last_processed_slot: 0, + spec, + epoch_map, + slot_clock, + beacon_node, + } + } +} + +impl BlockProducer { + /// "Poll" to see if the validator is required to take any action. + /// + /// The slot clock will be read and any new actions undertaken. + pub fn poll(&mut self) -> Result { + let slot = self + .slot_clock + .read() + .map_err(|_| Error::SlotClockPoisoned)? + .present_slot() + .map_err(|_| Error::SlotClockError)? + .ok_or(Error::SlotUnknowable)?; + + let epoch = slot + .checked_div(self.spec.epoch_length) + .ok_or(Error::EpochLengthIsZero)?; + + // If this is a new slot. + if slot > self.last_processed_slot { + let is_block_production_slot = { + let epoch_map = self.epoch_map.read().map_err(|_| Error::EpochMapPoisoned)?; + match epoch_map.get(&epoch) { + None => return Ok(PollOutcome::ProducerDutiesUnknown(slot)), + Some(duties) => duties.is_block_production_slot(slot), + } + }; + + if is_block_production_slot { + self.last_processed_slot = slot; + + self.produce_block(slot) + } else { + Ok(PollOutcome::BlockProductionNotRequired(slot)) + } + } else { + Ok(PollOutcome::SlotAlreadyProcessed(slot)) + } + } + + /// Produce a block at some slot. + /// + /// Assumes that a block is required at this slot (does not check the duties). + /// + /// Ensures the message is not slashable. + /// + /// !!! UNSAFE !!! + /// + /// The slash-protection code is not yet implemented. There is zero protection against + /// slashing. + fn produce_block(&mut self, slot: u64) -> Result { + if let Some(block) = self.beacon_node.produce_beacon_block(slot)? { + if self.safe_to_produce(&block) { + let block = self.sign_block(block); + self.beacon_node.publish_beacon_block(block)?; + Ok(PollOutcome::BlockProduced(slot)) + } else { + Ok(PollOutcome::SlashableBlockNotProduced(slot)) + } + } else { + Ok(PollOutcome::BeaconNodeUnableToProduceBlock(slot)) + } + } + + /// Consumes a block, returning that block signed by the validators private key. + /// + /// Important: this function will not check to ensure the block is not slashable. This must be + /// done upstream. + fn sign_block(&mut self, block: BeaconBlock) -> BeaconBlock { + // TODO: sign the block + // https://github.com/sigp/lighthouse/issues/160 + self.store_produce(&block); + block + } + + /// Returns `true` if signing a block is safe (non-slashable). + /// + /// !!! UNSAFE !!! + /// + /// Important: this function is presently stubbed-out. It provides ZERO SAFETY. + fn safe_to_produce(&self, _block: &BeaconBlock) -> bool { + // TODO: ensure the producer doesn't produce slashable blocks. + // https://github.com/sigp/lighthouse/issues/160 + true + } + + /// Record that a block was produced so that slashable votes may not be made in the future. + /// + /// !!! UNSAFE !!! + /// + /// Important: this function is presently stubbed-out. It provides ZERO SAFETY. + fn store_produce(&mut self, _block: &BeaconBlock) { + // TODO: record this block production to prevent future slashings. + // https://github.com/sigp/lighthouse/issues/160 + } +} + +impl From for Error { + fn from(e: BeaconNodeError) -> Error { + Error::BeaconNodeError(e) + } +} + +#[cfg(test)] +mod tests { + use super::test_node::TestBeaconNode; + use super::*; + use crate::duties::EpochDuties; + use slot_clock::TestingSlotClock; + use types::test_utils::{SeedableRng, TestRandom, XorShiftRng}; + + // TODO: implement more thorough testing. + // https://github.com/sigp/lighthouse/issues/160 + // + // These tests should serve as a good example for future tests. + + #[test] + pub fn polling() { + let mut rng = XorShiftRng::from_seed([42; 16]); + + let spec = Arc::new(ChainSpec::foundation()); + let epoch_map = Arc::new(RwLock::new(EpochDutiesMap::new())); + let slot_clock = Arc::new(RwLock::new(TestingSlotClock::new(0))); + let beacon_node = Arc::new(TestBeaconNode::default()); + + let mut block_producer = BlockProducer::new( + spec.clone(), + epoch_map.clone(), + slot_clock.clone(), + beacon_node.clone(), + ); + + // Configure responses from the BeaconNode. + beacon_node.set_next_produce_result(Ok(Some(BeaconBlock::random_for_test(&mut rng)))); + beacon_node.set_next_publish_result(Ok(true)); + + // Setup some valid duties for the validator + let produce_slot = 100; + let duties = EpochDuties { + block_production_slot: Some(produce_slot), + ..std::default::Default::default() + }; + let produce_epoch = produce_slot / spec.epoch_length; + epoch_map.write().unwrap().insert(produce_epoch, duties); + + // One slot before production slot... + slot_clock.write().unwrap().set_slot(produce_slot - 1); + assert_eq!( + block_producer.poll(), + Ok(PollOutcome::BlockProductionNotRequired(produce_slot - 1)) + ); + + // On the produce slot... + slot_clock.write().unwrap().set_slot(produce_slot); + assert_eq!( + block_producer.poll(), + Ok(PollOutcome::BlockProduced(produce_slot)) + ); + + // Trying the same produce slot again... + slot_clock.write().unwrap().set_slot(produce_slot); + assert_eq!( + block_producer.poll(), + Ok(PollOutcome::SlotAlreadyProcessed(produce_slot)) + ); + + // One slot after the produce slot... + slot_clock.write().unwrap().set_slot(produce_slot + 1); + assert_eq!( + block_producer.poll(), + Ok(PollOutcome::BlockProductionNotRequired(produce_slot + 1)) + ); + + // In an epoch without known duties... + let slot = (produce_epoch + 1) * spec.epoch_length; + slot_clock.write().unwrap().set_slot(slot); + assert_eq!( + block_producer.poll(), + Ok(PollOutcome::ProducerDutiesUnknown(slot)) + ); + } +} diff --git a/validator_client/src/block_producer/service.rs b/validator_client/src/block_producer/service.rs new file mode 100644 index 0000000000..ffdb330293 --- /dev/null +++ b/validator_client/src/block_producer/service.rs @@ -0,0 +1,45 @@ +use super::traits::BeaconNode; +use super::{BlockProducer, PollOutcome as BlockProducerPollOutcome, SlotClock}; +use slog::{error, info, warn, Logger}; +use std::time::Duration; + +pub struct BlockProducerService { + pub block_producer: BlockProducer, + pub poll_interval_millis: u64, + pub log: Logger, +} + +impl BlockProducerService { + /// Run a loop which polls the block producer each `poll_interval_millis` millseconds. + /// + /// Logs the results of the polls. + pub fn run(&mut self) { + loop { + match self.block_producer.poll() { + Err(error) => { + error!(self.log, "Block producer poll error"; "error" => format!("{:?}", error)) + } + Ok(BlockProducerPollOutcome::BlockProduced(slot)) => { + info!(self.log, "Produced block"; "slot" => slot) + } + Ok(BlockProducerPollOutcome::SlashableBlockNotProduced(slot)) => { + warn!(self.log, "Slashable block was not signed"; "slot" => slot) + } + Ok(BlockProducerPollOutcome::BlockProductionNotRequired(slot)) => { + info!(self.log, "Block production not required"; "slot" => slot) + } + Ok(BlockProducerPollOutcome::ProducerDutiesUnknown(slot)) => { + error!(self.log, "Block production duties unknown"; "slot" => slot) + } + Ok(BlockProducerPollOutcome::SlotAlreadyProcessed(slot)) => { + warn!(self.log, "Attempted to re-process slot"; "slot" => slot) + } + Ok(BlockProducerPollOutcome::BeaconNodeUnableToProduceBlock(slot)) => { + error!(self.log, "Beacon node unable to produce block"; "slot" => slot) + } + }; + + std::thread::sleep(Duration::from_millis(self.poll_interval_millis)); + } + } +} diff --git a/validator_client/src/block_producer/test_node.rs b/validator_client/src/block_producer/test_node.rs new file mode 100644 index 0000000000..e99613e8f3 --- /dev/null +++ b/validator_client/src/block_producer/test_node.rs @@ -0,0 +1,47 @@ +use super::traits::{BeaconNode, BeaconNodeError}; +use std::sync::RwLock; +use types::BeaconBlock; + +type ProduceResult = Result, BeaconNodeError>; +type PublishResult = Result; + +/// A test-only struct used to simulate a Beacon Node. +#[derive(Default)] +pub struct TestBeaconNode { + pub produce_input: RwLock>, + pub produce_result: RwLock>, + pub publish_input: RwLock>, + pub publish_result: RwLock>, +} + +impl TestBeaconNode { + /// Set the result to be returned when `produce_beacon_block` is called. + pub fn set_next_produce_result(&self, result: ProduceResult) { + *self.produce_result.write().unwrap() = Some(result); + } + + /// Set the result to be returned when `publish_beacon_block` is called. + pub fn set_next_publish_result(&self, result: PublishResult) { + *self.publish_result.write().unwrap() = Some(result); + } +} + +impl BeaconNode for TestBeaconNode { + /// Returns the value specified by the `set_next_produce_result`. + fn produce_beacon_block(&self, slot: u64) -> ProduceResult { + *self.produce_input.write().unwrap() = Some(slot); + match *self.produce_result.read().unwrap() { + Some(ref r) => r.clone(), + None => panic!("TestBeaconNode: produce_result == None"), + } + } + + /// Returns the value specified by the `set_next_publish_result`. + fn publish_beacon_block(&self, block: BeaconBlock) -> PublishResult { + *self.publish_input.write().unwrap() = Some(block); + match *self.publish_result.read().unwrap() { + Some(ref r) => r.clone(), + None => panic!("TestBeaconNode: publish_result == None"), + } + } +} diff --git a/validator_client/src/block_producer/traits.rs b/validator_client/src/block_producer/traits.rs new file mode 100644 index 0000000000..be1c73bda5 --- /dev/null +++ b/validator_client/src/block_producer/traits.rs @@ -0,0 +1,19 @@ +use types::BeaconBlock; + +#[derive(Debug, PartialEq, Clone)] +pub enum BeaconNodeError { + RemoteFailure(String), + DecodeFailure, +} + +/// Defines the methods required to produce and publish blocks on a Beacon Node. +pub trait BeaconNode: Send + Sync { + /// Request that the node produces a block. + /// + /// Returns Ok(None) if the Beacon Node is unable to produce at the given slot. + fn produce_beacon_block(&self, slot: u64) -> Result, BeaconNodeError>; + /// Request that the node publishes a block. + /// + /// Returns `true` if the publish was sucessful. + fn publish_beacon_block(&self, block: BeaconBlock) -> Result; +} diff --git a/validator_client/src/config.rs b/validator_client/src/config.rs new file mode 100644 index 0000000000..104a4bbe66 --- /dev/null +++ b/validator_client/src/config.rs @@ -0,0 +1,25 @@ +use std::fs; +use std::path::PathBuf; + +/// Stores the core configuration for this validator instance. +#[derive(Clone)] +pub struct ClientConfig { + pub data_dir: PathBuf, + pub server: String, +} + +const DEFAULT_LIGHTHOUSE_DIR: &str = ".lighthouse-validators"; + +impl ClientConfig { + /// Build a new configuration from defaults. + pub fn default() -> Self { + let data_dir = { + let home = dirs::home_dir().expect("Unable to determine home dir."); + home.join(DEFAULT_LIGHTHOUSE_DIR) + }; + fs::create_dir_all(&data_dir) + .unwrap_or_else(|_| panic!("Unable to create {:?}", &data_dir)); + let server = "localhost:50051".to_string(); + Self { data_dir, server } + } +} diff --git a/validator_client/src/duties/grpc.rs b/validator_client/src/duties/grpc.rs new file mode 100644 index 0000000000..b4a2fac514 --- /dev/null +++ b/validator_client/src/duties/grpc.rs @@ -0,0 +1,49 @@ +use super::traits::{BeaconNode, BeaconNodeError}; +use super::EpochDuties; +use protos::services::{ProposeBlockSlotRequest, PublicKey as IndexRequest}; +use protos::services_grpc::ValidatorServiceClient; +use ssz::ssz_encode; +use types::PublicKey; + +impl BeaconNode for ValidatorServiceClient { + /// Request the shuffling from the Beacon Node (BN). + /// + /// As this function takes a `PublicKey`, it will first attempt to resolve the public key into + /// a validator index, then call the BN for production/attestation duties. + /// + /// Note: presently only block production information is returned. + fn request_shuffling( + &self, + epoch: u64, + public_key: &PublicKey, + ) -> Result, BeaconNodeError> { + // Lookup the validator index for the supplied public key. + let validator_index = { + let mut req = IndexRequest::new(); + req.set_public_key(ssz_encode(public_key).to_vec()); + let resp = self + .validator_index(&req) + .map_err(|err| BeaconNodeError::RemoteFailure(format!("{:?}", err)))?; + resp.get_index() + }; + + let mut req = ProposeBlockSlotRequest::new(); + req.set_validator_index(validator_index); + req.set_epoch(epoch); + + let reply = self + .propose_block_slot(&req) + .map_err(|err| BeaconNodeError::RemoteFailure(format!("{:?}", err)))?; + + let block_production_slot = if reply.has_slot() { + Some(reply.get_slot()) + } else { + None + }; + + Ok(Some(EpochDuties { + validator_index, + block_production_slot, + })) + } +} diff --git a/validator_client/src/duties/mod.rs b/validator_client/src/duties/mod.rs new file mode 100644 index 0000000000..4656715bae --- /dev/null +++ b/validator_client/src/duties/mod.rs @@ -0,0 +1,179 @@ +mod grpc; +mod service; +#[cfg(test)] +mod test_node; +mod traits; + +use self::traits::{BeaconNode, BeaconNodeError}; +use bls::PublicKey; +use slot_clock::SlotClock; +use spec::ChainSpec; +use std::collections::HashMap; +use std::sync::{Arc, RwLock}; + +pub use self::service::DutiesManagerService; + +/// The information required for a validator to propose and attest during some epoch. +/// +/// Generally obtained from a Beacon Node, this information contains the validators canonical index +/// (thier sequence in the global validator induction process) and the "shuffling" for that index +/// for some epoch. +#[derive(Debug, PartialEq, Clone, Copy, Default)] +pub struct EpochDuties { + pub validator_index: u64, + pub block_production_slot: Option, + // Future shard info +} + +impl EpochDuties { + /// Returns `true` if the supplied `slot` is a slot in which the validator should produce a + /// block. + pub fn is_block_production_slot(&self, slot: u64) -> bool { + match self.block_production_slot { + Some(s) if s == slot => true, + _ => false, + } + } +} + +/// Maps an `epoch` to some `EpochDuties` for a single validator. +pub type EpochDutiesMap = HashMap; + +#[derive(Debug, PartialEq, Clone, Copy)] +pub enum PollOutcome { + /// The `EpochDuties` were not updated during this poll. + NoChange(u64), + /// The `EpochDuties` for the `epoch` were previously unknown, but obtained in the poll. + NewDuties(u64, EpochDuties), + /// New `EpochDuties` were obtained, different to those which were previously known. This is + /// likely to be the result of chain re-organisation. + DutiesChanged(u64, EpochDuties), + /// The Beacon Node was unable to return the duties as the validator is unknown, or the + /// shuffling for the epoch is unknown. + UnknownValidatorOrEpoch(u64), +} + +#[derive(Debug, PartialEq)] +pub enum Error { + SlotClockError, + SlotUnknowable, + EpochMapPoisoned, + SlotClockPoisoned, + EpochLengthIsZero, + BeaconNodeError(BeaconNodeError), +} + +/// A polling state machine which ensures the latest `EpochDuties` are obtained from the Beacon +/// Node. +/// +/// There is a single `DutiesManager` per validator instance. +pub struct DutiesManager { + pub duties_map: Arc>, + /// The validator's public key. + pub pubkey: PublicKey, + pub spec: Arc, + pub slot_clock: Arc>, + pub beacon_node: Arc, +} + +impl DutiesManager { + /// Poll the Beacon Node for `EpochDuties`. + /// + /// The present `epoch` will be learned from the supplied `SlotClock`. In production this will + /// be a wall-clock (e.g., system time, remote server time, etc.). + pub fn poll(&self) -> Result { + let slot = self + .slot_clock + .read() + .map_err(|_| Error::SlotClockPoisoned)? + .present_slot() + .map_err(|_| Error::SlotClockError)? + .ok_or(Error::SlotUnknowable)?; + + let epoch = slot + .checked_div(self.spec.epoch_length) + .ok_or(Error::EpochLengthIsZero)?; + + if let Some(duties) = self.beacon_node.request_shuffling(epoch, &self.pubkey)? { + let mut map = self + .duties_map + .write() + .map_err(|_| Error::EpochMapPoisoned)?; + + // If these duties were known, check to see if they're updates or identical. + let result = if let Some(known_duties) = map.get(&epoch) { + if *known_duties == duties { + Ok(PollOutcome::NoChange(epoch)) + } else { + Ok(PollOutcome::DutiesChanged(epoch, duties)) + } + } else { + Ok(PollOutcome::NewDuties(epoch, duties)) + }; + map.insert(epoch, duties); + result + } else { + Ok(PollOutcome::UnknownValidatorOrEpoch(epoch)) + } + } +} + +impl From for Error { + fn from(e: BeaconNodeError) -> Error { + Error::BeaconNodeError(e) + } +} + +#[cfg(test)] +mod tests { + use super::test_node::TestBeaconNode; + use super::*; + use bls::Keypair; + use slot_clock::TestingSlotClock; + + // TODO: implement more thorough testing. + // https://github.com/sigp/lighthouse/issues/160 + // + // These tests should serve as a good example for future tests. + + #[test] + pub fn polling() { + let spec = Arc::new(ChainSpec::foundation()); + let duties_map = Arc::new(RwLock::new(EpochDutiesMap::new())); + let keypair = Keypair::random(); + let slot_clock = Arc::new(RwLock::new(TestingSlotClock::new(0))); + let beacon_node = Arc::new(TestBeaconNode::default()); + + let manager = DutiesManager { + spec: spec.clone(), + pubkey: keypair.pk.clone(), + duties_map: duties_map.clone(), + slot_clock: slot_clock.clone(), + beacon_node: beacon_node.clone(), + }; + + // Configure response from the BeaconNode. + let duties = EpochDuties { + validator_index: 0, + block_production_slot: Some(10), + }; + beacon_node.set_next_shuffling_result(Ok(Some(duties))); + + // Get the duties for the first time... + assert_eq!(manager.poll(), Ok(PollOutcome::NewDuties(0, duties))); + // Get the same duties again... + assert_eq!(manager.poll(), Ok(PollOutcome::NoChange(0))); + + // Return new duties. + let duties = EpochDuties { + validator_index: 0, + block_production_slot: Some(11), + }; + beacon_node.set_next_shuffling_result(Ok(Some(duties))); + assert_eq!(manager.poll(), Ok(PollOutcome::DutiesChanged(0, duties))); + + // Return no duties. + beacon_node.set_next_shuffling_result(Ok(None)); + assert_eq!(manager.poll(), Ok(PollOutcome::UnknownValidatorOrEpoch(0))); + } +} diff --git a/validator_client/src/duties/service.rs b/validator_client/src/duties/service.rs new file mode 100644 index 0000000000..bdb6faefae --- /dev/null +++ b/validator_client/src/duties/service.rs @@ -0,0 +1,40 @@ +use super::traits::BeaconNode; +use super::{DutiesManager, PollOutcome}; +use slog::{debug, error, info, Logger}; +use slot_clock::SlotClock; +use std::time::Duration; + +pub struct DutiesManagerService { + pub manager: DutiesManager, + pub poll_interval_millis: u64, + pub log: Logger, +} + +impl DutiesManagerService { + /// Run a loop which polls the manager each `poll_interval_millis` milliseconds. + /// + /// Logs the results of the polls. + pub fn run(&mut self) { + loop { + match self.manager.poll() { + Err(error) => { + error!(self.log, "Epoch duties poll error"; "error" => format!("{:?}", error)) + } + Ok(PollOutcome::NoChange(epoch)) => { + debug!(self.log, "No change in duties"; "epoch" => epoch) + } + Ok(PollOutcome::DutiesChanged(epoch, duties)) => { + info!(self.log, "Duties changed (potential re-org)"; "epoch" => epoch, "duties" => format!("{:?}", duties)) + } + Ok(PollOutcome::NewDuties(epoch, duties)) => { + info!(self.log, "New duties obtained"; "epoch" => epoch, "duties" => format!("{:?}", duties)) + } + Ok(PollOutcome::UnknownValidatorOrEpoch(epoch)) => { + error!(self.log, "Epoch or validator unknown"; "epoch" => epoch) + } + }; + + std::thread::sleep(Duration::from_millis(self.poll_interval_millis)); + } + } +} diff --git a/validator_client/src/duties/test_node.rs b/validator_client/src/duties/test_node.rs new file mode 100644 index 0000000000..2b1d651728 --- /dev/null +++ b/validator_client/src/duties/test_node.rs @@ -0,0 +1,31 @@ +use super::traits::{BeaconNode, BeaconNodeError}; +use super::EpochDuties; +use bls::PublicKey; +use std::sync::RwLock; + +type ShufflingResult = Result, BeaconNodeError>; + +/// A test-only struct used to simulate a Beacon Node. +#[derive(Default)] +pub struct TestBeaconNode { + pub request_shuffling_input: RwLock>, + pub request_shuffling_result: RwLock>, +} + +impl TestBeaconNode { + /// Set the result to be returned when `request_shuffling` is called. + pub fn set_next_shuffling_result(&self, result: ShufflingResult) { + *self.request_shuffling_result.write().unwrap() = Some(result); + } +} + +impl BeaconNode for TestBeaconNode { + /// Returns the value specified by the `set_next_shuffling_result`. + fn request_shuffling(&self, epoch: u64, public_key: &PublicKey) -> ShufflingResult { + *self.request_shuffling_input.write().unwrap() = Some((epoch, public_key.clone())); + match *self.request_shuffling_result.read().unwrap() { + Some(ref r) => r.clone(), + None => panic!("TestBeaconNode: produce_result == None"), + } + } +} diff --git a/validator_client/src/duties/traits.rs b/validator_client/src/duties/traits.rs new file mode 100644 index 0000000000..38d61f967e --- /dev/null +++ b/validator_client/src/duties/traits.rs @@ -0,0 +1,19 @@ +use super::EpochDuties; +use bls::PublicKey; + +#[derive(Debug, PartialEq, Clone)] +pub enum BeaconNodeError { + RemoteFailure(String), +} + +/// Defines the methods required to obtain a validators shuffling from a Beacon Node. +pub trait BeaconNode: Send + Sync { + /// Get the shuffling for the given epoch and public key. + /// + /// Returns Ok(None) if the public key is unknown, or the shuffling for that epoch is unknown. + fn request_shuffling( + &self, + epoch: u64, + public_key: &PublicKey, + ) -> Result, BeaconNodeError>; +} diff --git a/validator_client/src/main.rs b/validator_client/src/main.rs new file mode 100644 index 0000000000..bbbc0b4c35 --- /dev/null +++ b/validator_client/src/main.rs @@ -0,0 +1,166 @@ +use self::duties::{DutiesManager, DutiesManagerService, EpochDutiesMap}; +use crate::block_producer::{BlockProducer, BlockProducerService}; +use crate::config::ClientConfig; +use bls::Keypair; +use clap::{App, Arg}; +use grpcio::{ChannelBuilder, EnvBuilder}; +use protos::services_grpc::{BeaconBlockServiceClient, ValidatorServiceClient}; +use slog::{error, info, o, Drain}; +use slot_clock::SystemTimeSlotClock; +use spec::ChainSpec; +use std::path::PathBuf; +use std::sync::{Arc, RwLock}; +use std::thread; + +mod block_producer; +mod config; +mod duties; + +fn main() { + // Logging + let decorator = slog_term::TermDecorator::new().build(); + let drain = slog_term::CompactFormat::new(decorator).build().fuse(); + let drain = slog_async::Async::new(drain).build().fuse(); + let log = slog::Logger::root(drain, o!()); + + // CLI + let matches = App::new("Lighthouse Validator Client") + .version("0.0.1") + .author("Sigma Prime ") + .about("Eth 2.0 Validator Client") + .arg( + Arg::with_name("datadir") + .long("datadir") + .value_name("DIR") + .help("Data directory for keys and databases.") + .takes_value(true), + ) + .arg( + Arg::with_name("server") + .long("server") + .value_name("server") + .help("Address to connect to BeaconNode.") + .takes_value(true), + ) + .get_matches(); + + let mut config = ClientConfig::default(); + + // Custom datadir + if let Some(dir) = matches.value_of("datadir") { + config.data_dir = PathBuf::from(dir.to_string()); + } + + // Custom server port + if let Some(server_str) = matches.value_of("server") { + if let Ok(addr) = server_str.parse::() { + config.server = addr.to_string(); + } else { + error!(log, "Invalid address"; "server" => server_str); + return; + } + } + + // Log configuration + info!(log, ""; + "data_dir" => &config.data_dir.to_str(), + "server" => &config.server); + + // Beacon node gRPC beacon block endpoints. + let beacon_block_grpc_client = { + let env = Arc::new(EnvBuilder::new().build()); + let ch = ChannelBuilder::new(env).connect(&config.server); + Arc::new(BeaconBlockServiceClient::new(ch)) + }; + + // Beacon node gRPC validator endpoints. + let validator_grpc_client = { + let env = Arc::new(EnvBuilder::new().build()); + let ch = ChannelBuilder::new(env).connect(&config.server); + Arc::new(ValidatorServiceClient::new(ch)) + }; + + // Ethereum + // + // TODO: Permit loading a custom spec from file. + // https://github.com/sigp/lighthouse/issues/160 + let spec = Arc::new(ChainSpec::foundation()); + + // Clock for determining the present slot. + let slot_clock = { + info!(log, "Genesis time"; "unix_epoch_seconds" => spec.genesis_time); + let clock = SystemTimeSlotClock::new(spec.genesis_time, spec.slot_duration) + .expect("Unable to instantiate SystemTimeSlotClock."); + Arc::new(RwLock::new(clock)) + }; + + let poll_interval_millis = spec.slot_duration * 1000 / 10; // 10% epoch time precision. + info!(log, "Starting block producer service"; "polls_per_epoch" => spec.slot_duration * 1000 / poll_interval_millis); + + /* + * Start threads. + */ + let mut threads = vec![]; + // TODO: keypairs are randomly generated; they should be loaded from a file or generated. + // https://github.com/sigp/lighthouse/issues/160 + let keypairs = vec![Keypair::random()]; + + for keypair in keypairs { + info!(log, "Starting validator services"; "validator" => keypair.pk.concatenated_hex_id()); + let duties_map = Arc::new(RwLock::new(EpochDutiesMap::new())); + + // Spawn a new thread to maintain the validator's `EpochDuties`. + let duties_manager_thread = { + let spec = spec.clone(); + let duties_map = duties_map.clone(); + let slot_clock = slot_clock.clone(); + let log = log.clone(); + let beacon_node = validator_grpc_client.clone(); + let pubkey = keypair.pk.clone(); + thread::spawn(move || { + let manager = DutiesManager { + duties_map, + pubkey, + spec, + slot_clock, + beacon_node, + }; + let mut duties_manager_service = DutiesManagerService { + manager, + poll_interval_millis, + log, + }; + + duties_manager_service.run(); + }) + }; + + // Spawn a new thread to perform block production for the validator. + let producer_thread = { + let spec = spec.clone(); + let duties_map = duties_map.clone(); + let slot_clock = slot_clock.clone(); + let log = log.clone(); + let client = beacon_block_grpc_client.clone(); + thread::spawn(move || { + let block_producer = BlockProducer::new(spec, duties_map, slot_clock, client); + let mut block_producer_service = BlockProducerService { + block_producer, + poll_interval_millis, + log, + }; + + block_producer_service.run(); + }) + }; + + threads.push((duties_manager_thread, producer_thread)); + } + + // Naively wait for all the threads to complete. + for tuple in threads { + let (manager, producer) = tuple; + let _ = producer.join(); + let _ = manager.join(); + } +}