diff --git a/validator_client/src/block_producer/mod.rs b/validator_client/src/block_producer/mod.rs index 2efcbd6ef8..db65ee7896 100644 --- a/validator_client/src/block_producer/mod.rs +++ b/validator_client/src/block_producer/mod.rs @@ -1,11 +1,11 @@ mod grpc; mod service; +#[cfg(test)] mod test_node; mod traits; use self::traits::{BeaconNode, BeaconNodeError}; use super::EpochDutiesMap; -use crate::duties::EpochDuties; use slot_clock::SlotClock; use spec::ChainSpec; use std::sync::{Arc, RwLock}; @@ -139,6 +139,7 @@ mod tests { use super::*; use slot_clock::TestingSlotClock; use types::test_utils::{SeedableRng, TestRandom, XorShiftRng}; + use crate::duties::EpochDuties; // TODO: implement more thorough testing. // diff --git a/validator_client/src/duties/grpc.rs b/validator_client/src/duties/grpc.rs index 4032d49f3e..4ea9c6d41e 100644 --- a/validator_client/src/duties/grpc.rs +++ b/validator_client/src/duties/grpc.rs @@ -1,63 +1,34 @@ +use super::EpochDuties; use super::traits::{BeaconNode, BeaconNodeError}; -use protos::services::{ - BeaconBlock as GrpcBeaconBlock, ProduceBeaconBlockRequest, PublishBeaconBlockRequest, -}; +use protos::services::ValidatorAssignmentRequest; use protos::services_grpc::BeaconBlockServiceClient; -use ssz::{ssz_encode, Decodable}; -use types::{BeaconBlock, BeaconBlockBody, Hash256, Signature}; +use ssz::ssz_encode; +use types::{PublicKey}; impl BeaconNode for BeaconBlockServiceClient { - fn produce_beacon_block(&self, slot: u64) -> Result, BeaconNodeError> { - let mut req = ProduceBeaconBlockRequest::new(); - req.set_slot(slot); + fn request_shuffling(&self, epoch: u64, public_key: &PublicKey) -> Result, BeaconNodeError> { + let mut req = ValidatorAssignmentRequest::new(); + req.set_epoch(epoch); + req.set_public_key(ssz_encode(public_key).to_vec()); let reply = self - .produce_beacon_block(&req) + .validator_assignment(&req) .map_err(|err| BeaconNodeError::RemoteFailure(format!("{:?}", err)))?; - if reply.has_block() { - let block = reply.get_block(); + if reply.has_validator_assignment() { + let assignment = reply.get_validator_assignment(); - let (signature, _) = Signature::ssz_decode(block.get_signature(), 0) - .map_err(|_| BeaconNodeError::DecodeFailure)?; + let block_production_slot = if assignment.has_block_production_slot() { + Some(assignment.get_block_production_slot()) + } else { + None + }; - // TODO: this conversion is incomplete; fix it. - Ok(Some(BeaconBlock { - slot: block.get_slot(), - parent_root: Hash256::zero(), - state_root: Hash256::zero(), - randao_reveal: Hash256::from(block.get_randao_reveal()), - candidate_pow_receipt_root: Hash256::zero(), - signature, - body: BeaconBlockBody { - proposer_slashings: vec![], - casper_slashings: vec![], - attestations: vec![], - deposits: vec![], - exits: vec![], - }, - })) + let duties = EpochDuties { block_production_slot }; + + Ok(Some(duties)) } else { Ok(None) } } - - fn publish_beacon_block(&self, block: BeaconBlock) -> Result { - let mut req = PublishBeaconBlockRequest::new(); - - // TODO: this conversion is incomplete; fix it. - let mut grpc_block = GrpcBeaconBlock::new(); - grpc_block.set_slot(block.slot); - grpc_block.set_block_root(vec![0]); - grpc_block.set_randao_reveal(block.randao_reveal.to_vec()); - grpc_block.set_signature(ssz_encode(&block.signature)); - - req.set_block(grpc_block); - - let reply = self - .publish_beacon_block(&req) - .map_err(|err| BeaconNodeError::RemoteFailure(format!("{:?}", err)))?; - - Ok(reply.get_success()) - } } diff --git a/validator_client/src/duties/mod.rs b/validator_client/src/duties/mod.rs index d401d350e9..0ac14b07f5 100644 --- a/validator_client/src/duties/mod.rs +++ b/validator_client/src/duties/mod.rs @@ -1,3 +1,9 @@ +mod grpc; +mod service; +#[cfg(test)] +mod test_node; +mod traits; + use self::traits::{BeaconNode, BeaconNodeError}; use bls::PublicKey; use slot_clock::SlotClock; @@ -5,10 +11,6 @@ use spec::ChainSpec; use std::collections::HashMap; use std::sync::{Arc, RwLock}; -mod service; -mod test_node; -mod traits; - pub use self::service::DutiesManagerService; #[derive(Debug, PartialEq, Clone, Copy, Default)] diff --git a/validator_client/src/duties/traits.rs b/validator_client/src/duties/traits.rs index eb0f1583e4..14c2adf954 100644 --- a/validator_client/src/duties/traits.rs +++ b/validator_client/src/duties/traits.rs @@ -4,7 +4,6 @@ use bls::PublicKey; #[derive(Debug, PartialEq, Clone)] pub enum BeaconNodeError { RemoteFailure(String), - DecodeFailure, } pub trait BeaconNode: Send + Sync { diff --git a/validator_client/src/main.rs b/validator_client/src/main.rs index 179146897f..187b40f04b 100644 --- a/validator_client/src/main.rs +++ b/validator_client/src/main.rs @@ -96,6 +96,25 @@ fn main() { for keypair in keypairs { let duties_map = Arc::new(RwLock::new(EpochDutiesMap::new())); + let duties_manager_thread = { + let spec = spec.clone(); + let duties_map = duties_map.clone(); + let slot_clock = slot_clock.clone(); + let log = log.clone(); + let beacon_node = client.clone(); + let pubkey = keypair.pk.clone(); + thread::spawn(move || { + let manager = DutiesManager { duties_map, pubkey, spec, slot_clock, beacon_node }; + let mut duties_manager_service = DutiesManagerService { + manager, + poll_interval_millis, + log, + }; + + duties_manager_service.run(); + }) + }; + let producer_thread = { let spec = spec.clone(); let duties_map = duties_map.clone(); @@ -113,11 +132,13 @@ fn main() { block_producer_service.run(); }) }; - threads.push(((), producer_thread)); + + threads.push((duties_manager_thread, producer_thread)); } for tuple in threads { let (manager, producer) = tuple; let _ = producer.join(); + let _ = manager.join(); } }