diff --git a/validator_client/src/block_producer/grpc.rs b/validator_client/src/block_producer/grpc.rs index ca47ee3a6b..20ced3d0c3 100644 --- a/validator_client/src/block_producer/grpc.rs +++ b/validator_client/src/block_producer/grpc.rs @@ -63,7 +63,7 @@ impl BeaconNode for BeaconBlockServiceClient { let mut grpc_block = GrpcBeaconBlock::new(); grpc_block.set_slot(block.slot); grpc_block.set_block_root(vec![0]); - grpc_block.set_randao_reveal(block.randao_reveal.to_vec()); + grpc_block.set_randao_reveal(ssz_encode(&block.randao_reveal)); grpc_block.set_signature(ssz_encode(&block.signature)); req.set_block(grpc_block); diff --git a/validator_client/src/block_producer/mod.rs b/validator_client/src/block_producer/mod.rs index f3cd0199bb..e0ea220b6a 100644 --- a/validator_client/src/block_producer/mod.rs +++ b/validator_client/src/block_producer/mod.rs @@ -2,10 +2,9 @@ mod grpc; mod service; #[cfg(test)] mod test_node; -mod traits; +pub mod traits; -use self::traits::{BeaconNode, BeaconNodeError}; -use super::EpochDutiesMap; +use self::traits::{BeaconNode, BeaconNodeError, DutiesReader, DutiesReaderError}; use slot_clock::SlotClock; use spec::ChainSpec; use std::sync::{Arc, RwLock}; @@ -45,19 +44,19 @@ pub enum Error { /// Ensures that messages are not slashable. /// /// Relies upon an external service to keep the `EpochDutiesMap` updated. -pub struct BlockProducer { +pub struct BlockProducer { pub last_processed_slot: u64, spec: Arc, - epoch_map: Arc>, + epoch_map: Arc, slot_clock: Arc>, beacon_node: Arc, } -impl BlockProducer { +impl BlockProducer { /// Returns a new instance where `last_processed_slot == 0`. pub fn new( spec: Arc, - epoch_map: Arc>, + epoch_map: Arc, slot_clock: Arc>, beacon_node: Arc, ) -> Self { @@ -71,7 +70,7 @@ impl BlockProducer { } } -impl BlockProducer { +impl BlockProducer { /// "Poll" to see if the validator is required to take any action. /// /// The slot clock will be read and any new actions undertaken. @@ -90,13 +89,14 @@ impl BlockProducer { // If this is a new slot. if slot > self.last_processed_slot { - let is_block_production_slot = { - let epoch_map = self.epoch_map.read().map_err(|_| Error::EpochMapPoisoned)?; - match epoch_map.get(&epoch) { - None => return Ok(PollOutcome::ProducerDutiesUnknown(slot)), - Some(duties) => duties.is_block_production_slot(slot), - } - }; + let is_block_production_slot = + match self.epoch_map.is_block_production_slot(epoch, slot) { + Ok(result) => result, + Err(DutiesReaderError::UnknownEpoch) => { + return Ok(PollOutcome::ProducerDutiesUnknown(slot)) + } + Err(DutiesReaderError::Poisoned) => return Err(Error::EpochMapPoisoned), + }; if is_block_production_slot { self.last_processed_slot = slot; @@ -178,6 +178,7 @@ mod tests { use super::test_node::TestBeaconNode; use super::*; use crate::duties::EpochDuties; + use crate::duties::EpochDutiesMap; use slot_clock::TestingSlotClock; use types::test_utils::{SeedableRng, TestRandom, XorShiftRng}; @@ -191,7 +192,7 @@ mod tests { let mut rng = XorShiftRng::from_seed([42; 16]); let spec = Arc::new(ChainSpec::foundation()); - let epoch_map = Arc::new(RwLock::new(EpochDutiesMap::new())); + let epoch_map = Arc::new(EpochDutiesMap::new()); let slot_clock = Arc::new(RwLock::new(TestingSlotClock::new(0))); let beacon_node = Arc::new(TestBeaconNode::default()); @@ -213,7 +214,7 @@ mod tests { ..std::default::Default::default() }; let produce_epoch = produce_slot / spec.epoch_length; - epoch_map.write().unwrap().insert(produce_epoch, duties); + epoch_map.insert(produce_epoch, duties); // One slot before production slot... slot_clock.write().unwrap().set_slot(produce_slot - 1); diff --git a/validator_client/src/block_producer/service.rs b/validator_client/src/block_producer/service.rs index ffdb330293..822df76c1e 100644 --- a/validator_client/src/block_producer/service.rs +++ b/validator_client/src/block_producer/service.rs @@ -1,15 +1,15 @@ -use super::traits::BeaconNode; +use super::traits::{BeaconNode, DutiesReader}; use super::{BlockProducer, PollOutcome as BlockProducerPollOutcome, SlotClock}; use slog::{error, info, warn, Logger}; use std::time::Duration; -pub struct BlockProducerService { - pub block_producer: BlockProducer, +pub struct BlockProducerService { + pub block_producer: BlockProducer, pub poll_interval_millis: u64, pub log: Logger, } -impl BlockProducerService { +impl BlockProducerService { /// Run a loop which polls the block producer each `poll_interval_millis` millseconds. /// /// Logs the results of the polls. diff --git a/validator_client/src/block_producer/traits.rs b/validator_client/src/block_producer/traits.rs index be1c73bda5..e16af24606 100644 --- a/validator_client/src/block_producer/traits.rs +++ b/validator_client/src/block_producer/traits.rs @@ -17,3 +17,12 @@ pub trait BeaconNode: Send + Sync { /// Returns `true` if the publish was sucessful. fn publish_beacon_block(&self, block: BeaconBlock) -> Result; } + +pub enum DutiesReaderError { + UnknownEpoch, + Poisoned, +} + +pub trait DutiesReader: Send + Sync { + fn is_block_production_slot(&self, epoch: u64, slot: u64) -> Result; +} diff --git a/validator_client/src/duties/mod.rs b/validator_client/src/duties/mod.rs index 4656715bae..8e70195333 100644 --- a/validator_client/src/duties/mod.rs +++ b/validator_client/src/duties/mod.rs @@ -5,6 +5,7 @@ mod test_node; mod traits; use self::traits::{BeaconNode, BeaconNodeError}; +use super::block_producer::traits::{DutiesReader, DutiesReaderError}; use bls::PublicKey; use slot_clock::SlotClock; use spec::ChainSpec; @@ -36,8 +37,52 @@ impl EpochDuties { } } +pub enum EpochDutiesMapError { + Poisoned, +} + /// Maps an `epoch` to some `EpochDuties` for a single validator. -pub type EpochDutiesMap = HashMap; +pub struct EpochDutiesMap { + pub map: RwLock>, +} + +impl EpochDutiesMap { + pub fn new() -> Self { + Self { + map: RwLock::new(HashMap::new()), + } + } + + pub fn get(&self, epoch: u64) -> Result, EpochDutiesMapError> { + let map = self.map.read().map_err(|_| EpochDutiesMapError::Poisoned)?; + match map.get(&epoch) { + Some(duties) => Ok(Some(duties.clone())), + None => Ok(None), + } + } + + pub fn insert( + &self, + epoch: u64, + epoch_duties: EpochDuties, + ) -> Result, EpochDutiesMapError> { + let mut map = self + .map + .write() + .map_err(|_| EpochDutiesMapError::Poisoned)?; + Ok(map.insert(epoch, epoch_duties)) + } +} + +impl DutiesReader for EpochDutiesMap { + fn is_block_production_slot(&self, epoch: u64, slot: u64) -> Result { + let map = self.map.read().map_err(|_| DutiesReaderError::Poisoned)?; + let duties = map + .get(&epoch) + .ok_or_else(|| DutiesReaderError::UnknownEpoch)?; + Ok(duties.is_block_production_slot(slot)) + } +} #[derive(Debug, PartialEq, Clone, Copy)] pub enum PollOutcome { @@ -68,7 +113,7 @@ pub enum Error { /// /// There is a single `DutiesManager` per validator instance. pub struct DutiesManager { - pub duties_map: Arc>, + pub duties_map: Arc, /// The validator's public key. pub pubkey: PublicKey, pub spec: Arc, @@ -95,14 +140,9 @@ impl DutiesManager { .ok_or(Error::EpochLengthIsZero)?; if let Some(duties) = self.beacon_node.request_shuffling(epoch, &self.pubkey)? { - let mut map = self - .duties_map - .write() - .map_err(|_| Error::EpochMapPoisoned)?; - // If these duties were known, check to see if they're updates or identical. - let result = if let Some(known_duties) = map.get(&epoch) { - if *known_duties == duties { + let result = if let Some(known_duties) = self.duties_map.get(epoch)? { + if known_duties == duties { Ok(PollOutcome::NoChange(epoch)) } else { Ok(PollOutcome::DutiesChanged(epoch, duties)) @@ -110,7 +150,7 @@ impl DutiesManager { } else { Ok(PollOutcome::NewDuties(epoch, duties)) }; - map.insert(epoch, duties); + self.duties_map.insert(epoch, duties)?; result } else { Ok(PollOutcome::UnknownValidatorOrEpoch(epoch)) @@ -124,6 +164,14 @@ impl From for Error { } } +impl From for Error { + fn from(e: EpochDutiesMapError) -> Error { + match e { + EpochDutiesMapError::Poisoned => Error::EpochMapPoisoned, + } + } +} + #[cfg(test)] mod tests { use super::test_node::TestBeaconNode; @@ -139,7 +187,7 @@ mod tests { #[test] pub fn polling() { let spec = Arc::new(ChainSpec::foundation()); - let duties_map = Arc::new(RwLock::new(EpochDutiesMap::new())); + let duties_map = Arc::new(EpochDutiesMap::new()); let keypair = Keypair::random(); let slot_clock = Arc::new(RwLock::new(TestingSlotClock::new(0))); let beacon_node = Arc::new(TestBeaconNode::default()); diff --git a/validator_client/src/main.rs b/validator_client/src/main.rs index bbbc0b4c35..c1775f8264 100644 --- a/validator_client/src/main.rs +++ b/validator_client/src/main.rs @@ -107,7 +107,7 @@ fn main() { for keypair in keypairs { info!(log, "Starting validator services"; "validator" => keypair.pk.concatenated_hex_id()); - let duties_map = Arc::new(RwLock::new(EpochDutiesMap::new())); + let duties_map = Arc::new(EpochDutiesMap::new()); // Spawn a new thread to maintain the validator's `EpochDuties`. let duties_manager_thread = {