diff --git a/beacon_node/Cargo.toml b/beacon_node/Cargo.toml index a090c1cc54..37d96a4974 100644 --- a/beacon_node/Cargo.toml +++ b/beacon_node/Cargo.toml @@ -9,7 +9,7 @@ types = { path = "../eth2/types" } client = { path = "client" } version = { path = "version" } clap = "2.32.0" -slog = "^2.2.3" +slog = { version = "^2.2.3" , features = ["max_level_trace", "release_max_level_debug"] } slog-term = "^2.4.0" slog-async = "^2.3.0" ctrlc = { version = "3.1.1", features = ["termination"] } diff --git a/beacon_node/beacon_chain/src/attestation_aggregator.rs b/beacon_node/beacon_chain/src/attestation_aggregator.rs deleted file mode 100644 index 9b4e5a6874..0000000000 --- a/beacon_node/beacon_chain/src/attestation_aggregator.rs +++ /dev/null @@ -1,218 +0,0 @@ -use ssz::TreeHash; -use state_processing::per_block_processing::validate_attestation_without_signature; -use std::collections::{HashMap, HashSet}; -use types::*; - -const PHASE_0_CUSTODY_BIT: bool = false; - -/// Provides the functionality to: -/// -/// - Recieve a `FreeAttestation` and aggregate it into an `Attestation` (or create a new if it -/// doesn't exist). -/// - Store all aggregated or created `Attestation`s. -/// - Produce a list of attestations that would be valid for inclusion in some `BeaconState` (and -/// therefore valid for inclusion in a `BeaconBlock`. -/// -/// Note: `Attestations` are stored in memory and never deleted. This is not scalable and must be -/// rectified in a future revision. -#[derive(Default)] -pub struct AttestationAggregator { - store: HashMap, Attestation>, -} - -pub struct Outcome { - pub valid: bool, - pub message: Message, -} - -pub enum Message { - /// The free attestation was added to an existing attestation. - Aggregated, - /// The free attestation has already been aggregated to an existing attestation. - AggregationNotRequired, - /// The free attestation was transformed into a new attestation. - NewAttestationCreated, - /// The supplied `validator_index` is not in the committee for the given `shard` and `slot`. - BadValidatorIndex, - /// The given `signature` did not match the `pubkey` in the given - /// `state.validator_registry`. - BadSignature, - /// The given `slot` does not match the validators committee assignment. - BadSlot, - /// The given `shard` does not match the validators committee assignment, or is not included in - /// a committee for the given slot. - BadShard, - /// Attestation is from the epoch prior to this, ignoring. - TooOld, -} - -macro_rules! valid_outcome { - ($error: expr) => { - return Ok(Outcome { - valid: true, - message: $error, - }); - }; -} - -macro_rules! invalid_outcome { - ($error: expr) => { - return Ok(Outcome { - valid: false, - message: $error, - }); - }; -} - -impl AttestationAggregator { - /// Instantiates a new AttestationAggregator with an empty database. - pub fn new() -> Self { - Self { - store: HashMap::new(), - } - } - - /// Accepts some `FreeAttestation`, validates it and either aggregates it upon some existing - /// `Attestation` or produces a new `Attestation`. - /// - /// The "validation" provided is not complete, instead the following points are checked: - /// - The given `validator_index` is in the committee for the given `shard` for the given - /// `slot`. - /// - The signature is verified against that of the validator at `validator_index`. - pub fn process_free_attestation( - &mut self, - state: &BeaconState, - free_attestation: &FreeAttestation, - spec: &ChainSpec, - ) -> Result { - let duties = - match state.get_attestation_duties(free_attestation.validator_index as usize, spec) { - Err(BeaconStateError::EpochCacheUninitialized(e)) => { - panic!("Attempted to access unbuilt cache {:?}.", e) - } - Err(BeaconStateError::EpochOutOfBounds) => invalid_outcome!(Message::TooOld), - Err(BeaconStateError::ShardOutOfBounds) => invalid_outcome!(Message::BadShard), - Err(e) => return Err(e), - Ok(None) => invalid_outcome!(Message::BadValidatorIndex), - Ok(Some(attestation_duties)) => attestation_duties, - }; - - if free_attestation.data.slot != duties.slot { - invalid_outcome!(Message::BadSlot); - } - if free_attestation.data.shard != duties.shard { - invalid_outcome!(Message::BadShard); - } - - let signable_message = AttestationDataAndCustodyBit { - data: free_attestation.data.clone(), - custody_bit: PHASE_0_CUSTODY_BIT, - } - .hash_tree_root(); - - let validator_record = match state - .validator_registry - .get(free_attestation.validator_index as usize) - { - None => invalid_outcome!(Message::BadValidatorIndex), - Some(validator_record) => validator_record, - }; - - if !free_attestation.signature.verify( - &signable_message, - spec.get_domain(state.current_epoch(spec), Domain::Attestation, &state.fork), - &validator_record.pubkey, - ) { - invalid_outcome!(Message::BadSignature); - } - - if let Some(existing_attestation) = self.store.get(&signable_message) { - if let Some(updated_attestation) = aggregate_attestation( - existing_attestation, - &free_attestation.signature, - duties.committee_index as usize, - ) { - self.store.insert(signable_message, updated_attestation); - valid_outcome!(Message::Aggregated); - } else { - valid_outcome!(Message::AggregationNotRequired); - } - } else { - let mut aggregate_signature = AggregateSignature::new(); - aggregate_signature.add(&free_attestation.signature); - let mut aggregation_bitfield = Bitfield::new(); - aggregation_bitfield.set(duties.committee_index as usize, true); - let new_attestation = Attestation { - data: free_attestation.data.clone(), - aggregation_bitfield, - custody_bitfield: Bitfield::new(), - aggregate_signature, - }; - self.store.insert(signable_message, new_attestation); - valid_outcome!(Message::NewAttestationCreated); - } - } - - /// Returns all known attestations which are: - /// - /// - Valid for the given state - /// - Not already in `state.latest_attestations`. - pub fn get_attestations_for_state( - &self, - state: &BeaconState, - spec: &ChainSpec, - ) -> Vec { - let mut known_attestation_data: HashSet = HashSet::new(); - - state - .previous_epoch_attestations - .iter() - .chain(state.current_epoch_attestations.iter()) - .for_each(|attestation| { - known_attestation_data.insert(attestation.data.clone()); - }); - - self.store - .values() - .filter_map(|attestation| { - if validate_attestation_without_signature(&state, attestation, spec).is_ok() - && !known_attestation_data.contains(&attestation.data) - { - Some(attestation.clone()) - } else { - None - } - }) - .collect() - } -} - -/// Produces a new `Attestation` where: -/// -/// - `signature` is added to `Attestation.aggregate_signature` -/// - Attestation.aggregation_bitfield[committee_index]` is set to true. -fn aggregate_attestation( - existing_attestation: &Attestation, - signature: &Signature, - committee_index: usize, -) -> Option { - let already_signed = existing_attestation - .aggregation_bitfield - .get(committee_index) - .unwrap_or(false); - - if already_signed { - None - } else { - let mut aggregation_bitfield = existing_attestation.aggregation_bitfield.clone(); - aggregation_bitfield.set(committee_index, true); - let mut aggregate_signature = existing_attestation.aggregate_signature.clone(); - aggregate_signature.add(&signature); - - Some(Attestation { - aggregation_bitfield, - aggregate_signature, - ..existing_attestation.clone() - }) - } -} diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 7c2336a28b..600c453fdf 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -11,7 +11,7 @@ use operation_pool::OperationPool; use parking_lot::{RwLock, RwLockReadGuard}; use slot_clock::SlotClock; use ssz::ssz_encode; -pub use state_processing::per_block_processing::errors::{ +use state_processing::per_block_processing::errors::{ AttestationValidationError, AttesterSlashingValidationError, DepositValidationError, ExitValidationError, ProposerSlashingValidationError, TransferValidationError, }; @@ -130,10 +130,7 @@ where state_root, )); - genesis_state.build_epoch_cache(RelativeEpoch::Previous, &spec)?; - genesis_state.build_epoch_cache(RelativeEpoch::Current, &spec)?; - genesis_state.build_epoch_cache(RelativeEpoch::NextWithoutRegistryChange, &spec)?; - genesis_state.build_epoch_cache(RelativeEpoch::NextWithRegistryChange, &spec)?; + genesis_state.build_all_caches(&spec)?; Ok(Self { block_store, @@ -293,7 +290,7 @@ where /// fork-choice rule). /// /// It is important to note that the `beacon_state` returned may not match the present slot. It - /// is the state as it was when the head block was recieved, which could be some slots prior to + /// is the state as it was when the head block was received, which could be some slots prior to /// now. pub fn head(&self) -> RwLockReadGuard { self.canonical_head.read() @@ -318,6 +315,8 @@ where per_slot_processing(&mut state, &latest_block_header, &self.spec)?; } + state.build_all_caches(&self.spec)?; + *self.state.write() = state; Ok(()) @@ -342,11 +341,17 @@ where per_slot_processing(&mut *state, &latest_block_header, &self.spec)?; } - state.build_epoch_cache(RelativeEpoch::Previous, &self.spec)?; - state.build_epoch_cache(RelativeEpoch::Current, &self.spec)?; - state.build_epoch_cache(RelativeEpoch::NextWithoutRegistryChange, &self.spec)?; - state.build_epoch_cache(RelativeEpoch::NextWithRegistryChange, &self.spec)?; - state.update_pubkey_cache()?; + + state.build_all_caches(&self.spec)?; + + Ok(()) + } + + /// Build all of the caches on the current state. + /// + /// Ideally this shouldn't be required, however we leave it here for testing. + pub fn ensure_state_caches_are_built(&self) -> Result<(), Error> { + self.state.write().build_all_caches(&self.spec)?; Ok(()) } @@ -477,14 +482,37 @@ where trace!("BeaconChain::produce_attestation: shard: {}", shard); let state = self.state.read(); - let target_root = *self.state.read().get_block_root( - self.state + let current_epoch_start_slot = self + .state + .read() + .slot + .epoch(self.spec.slots_per_epoch) + .start_slot(self.spec.slots_per_epoch); + + let target_root = if state.slot == current_epoch_start_slot { + // If we're on the first slot of the state's epoch. + if self.head().beacon_block.slot == state.slot { + // If the current head block is from the current slot, use its block root. + self.head().beacon_block_root + } else { + // If the current head block is not from this slot, use the slot from the previous + // epoch. + let root = *self.state.read().get_block_root( + current_epoch_start_slot - self.spec.slots_per_epoch, + &self.spec, + )?; + + root + } + } else { + // If we're not on the first slot of the epoch. + let root = *self + .state .read() - .slot - .epoch(self.spec.slots_per_epoch) - .start_slot(self.spec.slots_per_epoch), - &self.spec, - )?; + .get_block_root(current_epoch_start_slot, &self.spec)?; + + root + }; Ok(AttestationData { slot: self.state.read().slot, @@ -492,10 +520,7 @@ where beacon_block_root: self.head().beacon_block_root, target_root, crosslink_data_root: Hash256::zero(), - previous_crosslink: Crosslink { - epoch: self.state.read().slot.epoch(self.spec.slots_per_epoch), - crosslink_data_root: Hash256::zero(), - }, + previous_crosslink: state.latest_crosslinks[shard as usize].clone(), source_epoch: state.current_justified_epoch, source_root: state.current_justified_root, }) diff --git a/beacon_node/beacon_chain/src/lib.rs b/beacon_node/beacon_chain/src/lib.rs index 48a42b941e..d8d85a8a6c 100644 --- a/beacon_node/beacon_chain/src/lib.rs +++ b/beacon_node/beacon_chain/src/lib.rs @@ -1,4 +1,3 @@ -mod attestation_aggregator; mod beacon_chain; mod checkpoint; mod errors; @@ -7,10 +6,13 @@ pub mod test_utils; pub use self::beacon_chain::{BeaconChain, BlockProcessingOutcome, InvalidBlock, ValidBlock}; pub use self::checkpoint::CheckPoint; -pub use self::errors::BeaconChainError; -pub use attestation_aggregator::Outcome as AggregationOutcome; +pub use self::errors::{BeaconChainError, BlockProductionError}; pub use db; pub use fork_choice; pub use parking_lot; pub use slot_clock; +pub use state_processing::per_block_processing::errors::{ + AttestationValidationError, AttesterSlashingValidationError, DepositValidationError, + ExitValidationError, ProposerSlashingValidationError, TransferValidationError, +}; pub use types; diff --git a/beacon_node/beacon_chain/test_harness/specs/validator_registry.yaml b/beacon_node/beacon_chain/test_harness/specs/validator_registry.yaml index 1674ecffc7..ad9c899cfe 100644 --- a/beacon_node/beacon_chain/test_harness/specs/validator_registry.yaml +++ b/beacon_node/beacon_chain/test_harness/specs/validator_registry.yaml @@ -48,7 +48,8 @@ test_cases: - slot: 63 num_validators: 1003 num_previous_epoch_attestations: 0 - num_current_epoch_attestations: 10 + # slots_per_epoch - attestation_inclusion_delay - skip_slots + num_current_epoch_attestations: 57 slashed_validators: [11, 12, 13, 14, 42] exited_validators: [] exit_initiated_validators: [50] diff --git a/beacon_node/beacon_chain/test_harness/src/validator_harness/direct_beacon_node.rs b/beacon_node/beacon_chain/test_harness/src/validator_harness/direct_beacon_node.rs index 7853459d7e..2d2b9e84dc 100644 --- a/beacon_node/beacon_chain/test_harness/src/validator_harness/direct_beacon_node.rs +++ b/beacon_node/beacon_chain/test_harness/src/validator_harness/direct_beacon_node.rs @@ -50,7 +50,7 @@ impl DirectBeaconNode { } impl AttesterBeaconNode for DirectBeaconNode { - fn produce_attestation( + fn produce_attestation_data( &self, _slot: Slot, shard: u64, diff --git a/beacon_node/client/src/client_config.rs b/beacon_node/client/src/client_config.rs index cad287f2cc..407171ff58 100644 --- a/beacon_node/client/src/client_config.rs +++ b/beacon_node/client/src/client_config.rs @@ -10,6 +10,7 @@ use std::path::PathBuf; use types::multiaddr::Protocol; use types::multiaddr::ToMultiaddr; use types::ChainSpec; +use types::Multiaddr; /// Stores the client configuration for this Lighthouse instance. #[derive(Debug, Clone)] @@ -76,7 +77,7 @@ impl ClientConfig { } // Custom listening address ipv4/ipv6 // TODO: Handle list of addresses - if let Some(listen_address_str) = args.value_of("listen_address") { + if let Some(listen_address_str) = args.value_of("listen-address") { if let Ok(listen_address) = listen_address_str.parse::() { let multiaddr = SocketAddr::new(listen_address, config.net_conf.listen_port) .to_multiaddr() @@ -88,6 +89,17 @@ impl ClientConfig { } } + // Custom bootnodes + // TODO: Handle list of addresses + if let Some(boot_addresses_str) = args.value_of("boot-nodes") { + if let Ok(boot_address) = boot_addresses_str.parse::() { + config.net_conf.boot_nodes.append(&mut vec![boot_address]); + } else { + error!(log, "Invalid Bootnode multiaddress"; "Multiaddr" => boot_addresses_str); + return Err("Invalid IP Address"); + } + } + /* Filesystem related arguments */ // Custom datadir diff --git a/beacon_node/client/src/lib.rs b/beacon_node/client/src/lib.rs index 807fd9301e..6b4277c264 100644 --- a/beacon_node/client/src/lib.rs +++ b/beacon_node/client/src/lib.rs @@ -15,13 +15,11 @@ use futures::{future::Future, Stream}; use network::Service as NetworkService; use slog::{error, info, o}; use slot_clock::SlotClock; -use ssz::TreeHash; use std::marker::PhantomData; use std::sync::Arc; use std::time::{Duration, Instant}; use tokio::runtime::TaskExecutor; use tokio::timer::Interval; -use types::Hash256; /// Main beacon node client service. This provides the connection and initialisation of the clients /// sub-services in multiple threads. diff --git a/beacon_node/client/src/notifier.rs b/beacon_node/client/src/notifier.rs index 91a9f3a261..1a5ecbb53b 100644 --- a/beacon_node/client/src/notifier.rs +++ b/beacon_node/client/src/notifier.rs @@ -22,13 +22,13 @@ pub fn run(client: &Client, executor: TaskExecutor, exit: Exi // build heartbeat logic here let heartbeat = move |_| { - debug!(log, "Temp heartbeat output"); + //debug!(log, "Temp heartbeat output"); //TODO: Remove this logic. Testing only let mut count = counter.lock().unwrap(); *count += 1; if *count % 5 == 0 { - debug!(log, "Sending Message"); + // debug!(log, "Sending Message"); network.send_message(); } diff --git a/beacon_node/eth2-libp2p/src/behaviour.rs b/beacon_node/eth2-libp2p/src/behaviour.rs index 2865971838..88bfd0042a 100644 --- a/beacon_node/eth2-libp2p/src/behaviour.rs +++ b/beacon_node/eth2-libp2p/src/behaviour.rs @@ -1,4 +1,3 @@ -use crate::rpc::methods::BlockRootSlot; use crate::rpc::{RPCEvent, RPCMessage, Rpc}; use crate::NetworkConfig; use futures::prelude::*; @@ -13,10 +12,9 @@ use libp2p::{ tokio_io::{AsyncRead, AsyncWrite}, NetworkBehaviour, PeerId, }; -use slog::{debug, o, warn}; +use slog::{debug, o, trace, warn}; use ssz::{ssz_encode, Decodable, DecodeError, Encodable, SszStream}; -use ssz_derive::{Decode, Encode}; -use types::Attestation; +use types::{Attestation, BeaconBlock}; use types::{Topic, TopicHash}; /// Builds the network behaviour for the libp2p Swarm. @@ -49,7 +47,7 @@ impl NetworkBehaviourEventProcess { - debug!(self.log, "Received GossipEvent"; "msg" => format!("{:?}", gs_msg)); + trace!(self.log, "Received GossipEvent"; "msg" => format!("{:?}", gs_msg)); let pubsub_message = match PubsubMessage::ssz_decode(&gs_msg.data, 0) { //TODO: Punish peer on error @@ -198,7 +196,7 @@ pub enum BehaviourEvent { #[derive(Debug, Clone, PartialEq)] pub enum PubsubMessage { /// Gossipsub message providing notification of a new block. - Block(BlockRootSlot), + Block(BeaconBlock), /// Gossipsub message providing notification of a new attestation. Attestation(Attestation), } @@ -224,7 +222,7 @@ impl Decodable for PubsubMessage { let (id, index) = u32::ssz_decode(bytes, index)?; match id { 0 => { - let (block, index) = BlockRootSlot::ssz_decode(bytes, index)?; + let (block, index) = BeaconBlock::ssz_decode(bytes, index)?; Ok((PubsubMessage::Block(block), index)) } 1 => { @@ -243,10 +241,7 @@ mod test { #[test] fn ssz_encoding() { - let original = PubsubMessage::Block(BlockRootSlot { - block_root: Hash256::from_slice(&[42; 32]), - slot: Slot::new(4), - }); + let original = PubsubMessage::Block(BeaconBlock::empty(&ChainSpec::foundation())); let encoded = ssz_encode(&original); diff --git a/beacon_node/eth2-libp2p/src/rpc/methods.rs b/beacon_node/eth2-libp2p/src/rpc/methods.rs index ad3233be71..dc0be19a92 100644 --- a/beacon_node/eth2-libp2p/src/rpc/methods.rs +++ b/beacon_node/eth2-libp2p/src/rpc/methods.rs @@ -1,7 +1,7 @@ use ssz::{Decodable, DecodeError, Encodable, SszStream}; /// Available RPC methods types and ids. use ssz_derive::{Decode, Encode}; -use types::{Attestation, BeaconBlockBody, BeaconBlockHeader, Epoch, Hash256, Slot}; +use types::{BeaconBlockBody, BeaconBlockHeader, Epoch, Hash256, Slot}; #[derive(Debug)] /// Available Serenity Libp2p RPC methods @@ -179,6 +179,19 @@ pub struct BeaconBlockRootsResponse { pub roots: Vec, } +impl BeaconBlockRootsResponse { + /// Returns `true` if each `self.roots.slot[i]` is higher than the preceeding `i`. + pub fn slots_are_ascending(&self) -> bool { + for i in 1..self.roots.len() { + if self.roots[i - 1].slot >= self.roots[i].slot { + return false; + } + } + + true + } +} + /// Contains a block root and associated slot. #[derive(Encode, Decode, Clone, Debug, PartialEq)] pub struct BlockRootSlot { diff --git a/beacon_node/eth2-libp2p/src/service.rs b/beacon_node/eth2-libp2p/src/service.rs index 0dc30cf420..f52d11ef1e 100644 --- a/beacon_node/eth2-libp2p/src/service.rs +++ b/beacon_node/eth2-libp2p/src/service.rs @@ -113,7 +113,7 @@ impl Stream for Service { topics, message, } => { - debug!(self.log, "Pubsub message received: {:?}", message); + trace!(self.log, "Pubsub message received: {:?}", message); return Ok(Async::Ready(Some(Libp2pEvent::PubsubMessage { source, topics, diff --git a/beacon_node/network/Cargo.toml b/beacon_node/network/Cargo.toml index c6411a0205..cd2c2269a6 100644 --- a/beacon_node/network/Cargo.toml +++ b/beacon_node/network/Cargo.toml @@ -13,7 +13,7 @@ beacon_chain = { path = "../beacon_chain" } eth2-libp2p = { path = "../eth2-libp2p" } version = { path = "../version" } types = { path = "../../eth2/types" } -slog = "2.4.1" +slog = { version = "^2.2.3" , features = ["max_level_trace", "release_max_level_debug"] } ssz = { path = "../../eth2/utils/ssz" } futures = "0.1.25" error-chain = "0.12.0" diff --git a/beacon_node/network/src/beacon_chain.rs b/beacon_node/network/src/beacon_chain.rs index 8ec8162ff7..827adeb3c9 100644 --- a/beacon_node/network/src/beacon_chain.rs +++ b/beacon_node/network/src/beacon_chain.rs @@ -5,12 +5,12 @@ use beacon_chain::{ parking_lot::RwLockReadGuard, slot_clock::SlotClock, types::{BeaconState, ChainSpec}, - AggregationOutcome, CheckPoint, + AttestationValidationError, CheckPoint, }; use eth2_libp2p::rpc::HelloMessage; use types::{Attestation, BeaconBlock, BeaconBlockBody, BeaconBlockHeader, Epoch, Hash256, Slot}; -pub use beacon_chain::{BeaconChainError, BlockProcessingOutcome}; +pub use beacon_chain::{BeaconChainError, BlockProcessingOutcome, InvalidBlock}; /// The network's API to the beacon chain. pub trait BeaconChain: Send + Sync { @@ -40,7 +40,7 @@ pub trait BeaconChain: Send + Sync { fn process_attestation( &self, attestation: Attestation, - ) -> Result; + ) -> Result<(), AttestationValidationError>; fn get_block_roots( &self, @@ -126,14 +126,9 @@ where fn process_attestation( &self, - _attestation: Attestation, - ) -> Result { - // Awaiting a proper operations pool before we can import attestations. - // - // Returning a useless error for now. - // - // https://github.com/sigp/lighthouse/issues/281 - return Err(BeaconChainError::DBInconsistent("CANNOT PROCESS".into())); + attestation: Attestation, + ) -> Result<(), AttestationValidationError> { + self.process_attestation(attestation) } fn get_block_roots( diff --git a/beacon_node/network/src/message_handler.rs b/beacon_node/network/src/message_handler.rs index 0efa6b96fd..098a5b4bfb 100644 --- a/beacon_node/network/src/message_handler.rs +++ b/beacon_node/network/src/message_handler.rs @@ -208,8 +208,9 @@ impl MessageHandler { fn handle_gossip(&mut self, peer_id: PeerId, gossip_message: PubsubMessage) { match gossip_message { PubsubMessage::Block(message) => { - self.sync - .on_block_gossip(peer_id, message, &mut self.network_context) + let _should_foward_on = + self.sync + .on_block_gossip(peer_id, message, &mut self.network_context); } PubsubMessage::Attestation(message) => { self.sync diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index b2d2b5a246..aee7eb4660 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -161,7 +161,7 @@ fn network_service( libp2p_service.swarm.send_rpc(peer_id, rpc_event); } OutgoingMessage::NotifierTest => { - debug!(log, "Received message from notifier"); + // debug!(log, "Received message from notifier"); } }; } diff --git a/beacon_node/network/src/sync/import_queue.rs b/beacon_node/network/src/sync/import_queue.rs index 17cbd2f12e..0026347eb2 100644 --- a/beacon_node/network/src/sync/import_queue.rs +++ b/beacon_node/network/src/sync/import_queue.rs @@ -5,7 +5,7 @@ use slog::{debug, error}; use ssz::TreeHash; use std::sync::Arc; use std::time::{Duration, Instant}; -use types::{BeaconBlock, BeaconBlockBody, BeaconBlockHeader, Hash256}; +use types::{BeaconBlock, BeaconBlockBody, BeaconBlockHeader, Hash256, Slot}; /// Provides a queue for fully and partially built `BeaconBlock`s. /// @@ -104,7 +104,7 @@ impl ImportQueue { } /// Returns `true` if `self.chain` has not yet processed this block. - pub fn is_new_block(&self, block_root: &Hash256) -> bool { + pub fn chain_has_not_seen_block(&self, block_root: &Hash256) -> bool { self.chain .is_new_block_root(&block_root) .unwrap_or_else(|_| { @@ -113,11 +113,36 @@ impl ImportQueue { }) } - /// Returns the index of the first new root in the list of block roots. - pub fn first_new_root(&mut self, roots: &[BlockRootSlot]) -> Option { - roots + /// Adds the `block_roots` to the partials queue. + /// + /// If a `block_root` is not in the queue and has not been processed by the chain it is added + /// to the queue and it's block root is included in the output. + pub fn enqueue_block_roots( + &mut self, + block_roots: &[BlockRootSlot], + sender: PeerId, + ) -> Vec { + let new_roots: Vec = block_roots .iter() - .position(|brs| self.is_new_block(&brs.block_root)) + // Ignore any roots already processed by the chain. + .filter(|brs| self.chain_has_not_seen_block(&brs.block_root)) + // Ignore any roots already stored in the queue. + .filter(|brs| !self.partials.iter().any(|p| p.block_root == brs.block_root)) + .cloned() + .collect(); + + new_roots.iter().for_each(|brs| { + self.partials.push(PartialBeaconBlock { + slot: brs.slot, + block_root: brs.block_root, + sender: sender.clone(), + header: None, + body: None, + inserted: Instant::now(), + }) + }); + + new_roots } /// Adds the `headers` to the `partials` queue. Returns a list of `Hash256` block roots for @@ -143,7 +168,7 @@ impl ImportQueue { for header in headers { let block_root = Hash256::from_slice(&header.hash_tree_root()[..]); - if self.is_new_block(&block_root) { + if self.chain_has_not_seen_block(&block_root) { self.insert_header(block_root, header, sender.clone()); required_bodies.push(block_root) } @@ -161,6 +186,12 @@ impl ImportQueue { } } + pub fn enqueue_full_blocks(&mut self, blocks: Vec, sender: PeerId) { + for block in blocks { + self.insert_full_block(block, sender.clone()); + } + } + /// Inserts a header to the queue. /// /// If the header already exists, the `inserted` time is set to `now` and not other @@ -171,11 +202,21 @@ impl ImportQueue { .iter() .position(|p| p.block_root == block_root) { + // Case 1: there already exists a partial with a matching block root. + // + // The `inserted` time is set to now and the header is replaced, regardless of whether + // it existed or not. + self.partials[i].header = Some(header); self.partials[i].inserted = Instant::now(); } else { + // Case 2: there was no partial with a matching block root. + // + // A new partial is added. This case permits adding a header without already known the + // root -- this is not possible in the wire protocol however we support it anyway. self.partials.push(PartialBeaconBlock { + slot: header.slot, block_root, - header, + header: Some(header), body: None, inserted: Instant::now(), sender, @@ -192,25 +233,54 @@ impl ImportQueue { let body_root = Hash256::from_slice(&body.hash_tree_root()[..]); self.partials.iter_mut().for_each(|mut p| { - if body_root == p.header.block_body_root { - p.inserted = Instant::now(); + if let Some(header) = &mut p.header { + if body_root == header.block_body_root { + p.inserted = Instant::now(); - if p.body.is_none() { - p.body = Some(body.clone()); - p.sender = sender.clone(); + if p.body.is_none() { + p.body = Some(body.clone()); + p.sender = sender.clone(); + } } } }); } + + /// Updates an existing `partial` with the completed block, or adds a new (complete) partial. + /// + /// If the partial already existed, the `inserted` time is set to `now`. + fn insert_full_block(&mut self, block: BeaconBlock, sender: PeerId) { + let block_root = Hash256::from_slice(&block.hash_tree_root()[..]); + + let partial = PartialBeaconBlock { + slot: block.slot, + block_root, + header: Some(block.block_header()), + body: Some(block.body), + inserted: Instant::now(), + sender, + }; + + if let Some(i) = self + .partials + .iter() + .position(|p| p.block_root == block_root) + { + self.partials[i] = partial; + } else { + self.partials.push(partial) + } + } } /// Individual components of a `BeaconBlock`, potentially all that are required to form a full /// `BeaconBlock`. #[derive(Clone, Debug)] pub struct PartialBeaconBlock { + pub slot: Slot, /// `BeaconBlock` root. pub block_root: Hash256, - pub header: BeaconBlockHeader, + pub header: Option, pub body: Option, /// The instant at which this record was created or last meaningfully modified. Used to /// determine if an entry is stale and should be removed. @@ -225,7 +295,7 @@ impl PartialBeaconBlock { pub fn complete(self) -> Option<(Hash256, BeaconBlock, PeerId)> { Some(( self.block_root, - self.header.into_block(self.body?), + self.header?.into_block(self.body?), self.sender, )) } diff --git a/beacon_node/network/src/sync/simple_sync.rs b/beacon_node/network/src/sync/simple_sync.rs index 85949fa98e..9a1e51bdd4 100644 --- a/beacon_node/network/src/sync/simple_sync.rs +++ b/beacon_node/network/src/sync/simple_sync.rs @@ -1,20 +1,25 @@ use super::import_queue::ImportQueue; -use crate::beacon_chain::BeaconChain; +use crate::beacon_chain::{BeaconChain, BlockProcessingOutcome, InvalidBlock}; use crate::message_handler::NetworkContext; use eth2_libp2p::rpc::methods::*; use eth2_libp2p::rpc::{RPCRequest, RPCResponse, RequestId}; use eth2_libp2p::PeerId; use slog::{debug, error, info, o, warn}; +use ssz::TreeHash; use std::collections::HashMap; use std::sync::Arc; use std::time::Duration; -use types::{Attestation, Epoch, Hash256, Slot}; +use types::{Attestation, BeaconBlock, Epoch, Hash256, Slot}; /// The number of slots that we can import blocks ahead of us, before going into full Sync mode. const SLOT_IMPORT_TOLERANCE: u64 = 100; /// The amount of seconds a block (or partial block) may exist in the import queue. -const QUEUE_STALE_SECS: u64 = 60; +const QUEUE_STALE_SECS: u64 = 600; + +/// If a block is more than `FUTURE_SLOT_TOLERANCE` slots ahead of our slot clock, we drop it. +/// Otherwise we queue it. +const FUTURE_SLOT_TOLERANCE: u64 = 1; /// Keeps track of syncing information for known connected peers. #[derive(Clone, Copy, Debug)] @@ -358,31 +363,50 @@ impl SimpleSync { if res.roots.is_empty() { warn!( self.log, - "Peer returned empty block roots response. PeerId: {:?}", peer_id + "Peer returned empty block roots response"; + "peer_id" => format!("{:?}", peer_id) ); return; } - let new_root_index = self.import_queue.first_new_root(&res.roots); - - // If a new block root is found, request it and all the headers following it. - // - // We make an assumption here that if we don't know a block then we don't know of all - // it's parents. This might not be the case if syncing becomes more sophisticated. - if let Some(i) = new_root_index { - let new = &res.roots[i]; - - self.request_block_headers( - peer_id, - BeaconBlockHeadersRequest { - start_root: new.block_root, - start_slot: new.slot, - max_headers: (res.roots.len() - i) as u64, - skip_slots: 0, - }, - network, - ) + // The wire protocol specifies that slots must be in ascending order. + if !res.slots_are_ascending() { + warn!( + self.log, + "Peer returned block roots response with bad slot ordering"; + "peer_id" => format!("{:?}", peer_id) + ); + return; } + + let new_roots = self + .import_queue + .enqueue_block_roots(&res.roots, peer_id.clone()); + + // No new roots means nothing to do. + // + // This check protects against future panics. + if new_roots.is_empty() { + return; + } + + // Determine the first (earliest) and last (latest) `BlockRootSlot` items. + // + // This logic relies upon slots to be in ascending order, which is enforced earlier. + let first = new_roots.first().expect("Non-empty list must have first"); + let last = new_roots.last().expect("Non-empty list must have last"); + + // Request all headers between the earliest and latest new `BlockRootSlot` items. + self.request_block_headers( + peer_id, + BeaconBlockHeadersRequest { + start_root: first.block_root, + start_slot: first.slot, + max_headers: (last.slot - first.slot + 1).as_u64(), + skip_slots: 0, + }, + network, + ) } /// Handle a `BeaconBlockHeaders` request from the peer. @@ -517,34 +541,148 @@ impl SimpleSync { } /// Process a gossip message declaring a new block. + /// + /// Returns a `bool` which, if `true`, indicates we should forward the block to our peers. pub fn on_block_gossip( &mut self, peer_id: PeerId, - msg: BlockRootSlot, + block: BeaconBlock, network: &mut NetworkContext, - ) { + ) -> bool { info!( self.log, "NewGossipBlock"; "peer" => format!("{:?}", peer_id), ); - // TODO: filter out messages that a prior to the finalized slot. - // - // TODO: if the block is a few more slots ahead, try to get all block roots from then until - // now. - // - // Note: only requests the new block -- will fail if we don't have its parents. - if self.import_queue.is_new_block(&msg.block_root) { - self.request_block_headers( - peer_id, - BeaconBlockHeadersRequest { - start_root: msg.block_root, - start_slot: msg.slot, - max_headers: 1, - skip_slots: 0, - }, - network, - ) + + // Ignore any block from a finalized slot. + if self.slot_is_finalized(block.slot) { + warn!( + self.log, "NewGossipBlock"; + "msg" => "new block slot is finalized.", + "block_slot" => block.slot, + ); + return false; + } + + let block_root = Hash256::from_slice(&block.hash_tree_root()); + + // Ignore any block that the chain already knows about. + if self.chain_has_seen_block(&block_root) { + println!("this happened"); + // TODO: Age confirm that we shouldn't forward a block if we already know of it. + return false; + } + + debug!( + self.log, + "NewGossipBlock"; + "peer" => format!("{:?}", peer_id), + "msg" => "processing block", + ); + match self.chain.process_block(block.clone()) { + Ok(BlockProcessingOutcome::InvalidBlock(InvalidBlock::ParentUnknown)) => { + // The block was valid and we processed it successfully. + debug!( + self.log, "NewGossipBlock"; + "msg" => "parent block unknown", + "parent_root" => format!("{}", block.previous_block_root), + "peer" => format!("{:?}", peer_id), + ); + // Queue the block for later processing. + self.import_queue + .enqueue_full_blocks(vec![block], peer_id.clone()); + // Send a hello to learn of the clients best slot so we can then sync the require + // parent(s). + network.send_rpc_request( + peer_id.clone(), + RPCRequest::Hello(self.chain.hello_message()), + ); + // Forward the block onto our peers. + // + // Note: this may need to be changed if we decide to only forward blocks if we have + // all required info. + true + } + Ok(BlockProcessingOutcome::InvalidBlock(InvalidBlock::FutureSlot { + present_slot, + block_slot, + })) => { + if block_slot - present_slot > FUTURE_SLOT_TOLERANCE { + // The block is too far in the future, drop it. + warn!( + self.log, "NewGossipBlock"; + "msg" => "future block rejected", + "present_slot" => present_slot, + "block_slot" => block_slot, + "FUTURE_SLOT_TOLERANCE" => FUTURE_SLOT_TOLERANCE, + "peer" => format!("{:?}", peer_id), + ); + // Do not forward the block around to peers. + false + } else { + // The block is in the future, but not too far. + warn!( + self.log, "NewGossipBlock"; + "msg" => "queuing future block", + "present_slot" => present_slot, + "block_slot" => block_slot, + "FUTURE_SLOT_TOLERANCE" => FUTURE_SLOT_TOLERANCE, + "peer" => format!("{:?}", peer_id), + ); + // Queue the block for later processing. + self.import_queue.enqueue_full_blocks(vec![block], peer_id); + // Forward the block around to peers. + true + } + } + Ok(outcome) => { + if outcome.is_invalid() { + // The peer has sent a block which is fundamentally invalid. + warn!( + self.log, "NewGossipBlock"; + "msg" => "invalid block from peer", + "outcome" => format!("{:?}", outcome), + "peer" => format!("{:?}", peer_id), + ); + // Disconnect the peer + network.disconnect(peer_id, GoodbyeReason::Fault); + // Do not forward the block to peers. + false + } else if outcome.sucessfully_processed() { + // The block was valid and we processed it successfully. + info!( + self.log, "NewGossipBlock"; + "msg" => "block import successful", + "peer" => format!("{:?}", peer_id), + ); + // Forward the block to peers + true + } else { + // The block wasn't necessarily invalid but we didn't process it successfully. + // This condition shouldn't be reached. + error!( + self.log, "NewGossipBlock"; + "msg" => "unexpected condition in processing block.", + "outcome" => format!("{:?}", outcome), + ); + // Do not forward the block on. + false + } + } + Err(e) => { + // We encountered an error whilst processing the block. + // + // Blocks should not be able to trigger errors, instead they should be flagged as + // invalid. + error!( + self.log, "NewGossipBlock"; + "msg" => "internal error in processing block.", + "error" => format!("{:?}", e), + ); + // Do not forward the block to peers. + false + } } } @@ -563,12 +701,9 @@ impl SimpleSync { "peer" => format!("{:?}", peer_id), ); - // Awaiting a proper operations pool before we can import attestations. - // - // https://github.com/sigp/lighthouse/issues/281 match self.chain.process_attestation(msg) { - Ok(_) => panic!("Impossible, method not implemented."), - Err(_) => error!(self.log, "Attestation processing not implemented!"), + Ok(()) => info!(self.log, "ImportedAttestation"), + Err(e) => warn!(self.log, "InvalidAttestation"; "error" => format!("{:?}", e)), } } @@ -594,12 +729,21 @@ impl SimpleSync { "reason" => format!("{:?}", outcome), ); network.disconnect(sender, GoodbyeReason::Fault); + break; } // If this results to true, the item will be removed from the queue. if outcome.sucessfully_processed() { successful += 1; self.import_queue.remove(block_root); + } else { + debug!( + self.log, + "ProcessImportQueue"; + "msg" => "Block not imported", + "outcome" => format!("{:?}", outcome), + "peer" => format!("{:?}", sender), + ); } } Err(e) => { @@ -678,6 +822,26 @@ impl SimpleSync { network.send_rpc_request(peer_id.clone(), RPCRequest::BeaconBlockBodies(req)); } + /// Returns `true` if `self.chain` has not yet processed this block. + pub fn chain_has_seen_block(&self, block_root: &Hash256) -> bool { + !self + .chain + .is_new_block_root(&block_root) + .unwrap_or_else(|_| { + error!(self.log, "Unable to determine if block is new."); + false + }) + } + + /// Returns `true` if the given slot is finalized in our chain. + fn slot_is_finalized(&self, slot: Slot) -> bool { + slot <= self + .chain + .hello_message() + .latest_finalized_epoch + .start_slot(self.chain.get_spec().slots_per_epoch) + } + /// Generates our current state in the form of a HELLO RPC message. pub fn generate_hello(&self) -> HelloMessage { self.chain.hello_message() diff --git a/beacon_node/network/tests/tests.rs b/beacon_node/network/tests/tests.rs index 9cead1b557..47d5482d3e 100644 --- a/beacon_node/network/tests/tests.rs +++ b/beacon_node/network/tests/tests.rs @@ -543,7 +543,7 @@ fn sync_two_nodes() { // A provides block bodies to B. node_a.tee_block_body_response(&node_b); - std::thread::sleep(Duration::from_secs(10)); + std::thread::sleep(Duration::from_secs(20)); node_b.harness.run_fork_choice(); diff --git a/beacon_node/rpc/src/attestation.rs b/beacon_node/rpc/src/attestation.rs new file mode 100644 index 0000000000..abef49df1d --- /dev/null +++ b/beacon_node/rpc/src/attestation.rs @@ -0,0 +1,159 @@ +use crate::beacon_chain::BeaconChain; +use futures::Future; +use grpcio::{RpcContext, RpcStatus, RpcStatusCode, UnarySink}; +use protos::services::{ + AttestationData as AttestationDataProto, ProduceAttestationDataRequest, + ProduceAttestationDataResponse, PublishAttestationRequest, PublishAttestationResponse, +}; +use protos::services_grpc::AttestationService; +use slog::{error, info, trace, warn}; +use ssz::{ssz_encode, Decodable}; +use std::sync::Arc; +use types::Attestation; + +#[derive(Clone)] +pub struct AttestationServiceInstance { + pub chain: Arc, + pub log: slog::Logger, +} + +impl AttestationService for AttestationServiceInstance { + /// Produce the `AttestationData` for signing by a validator. + fn produce_attestation_data( + &mut self, + ctx: RpcContext, + req: ProduceAttestationDataRequest, + sink: UnarySink, + ) { + trace!( + &self.log, + "Attempting to produce attestation at slot {}", + req.get_slot() + ); + + // verify the slot, drop lock on state afterwards + { + let slot_requested = req.get_slot(); + let state = self.chain.get_state(); + + // Start by performing some checks + // Check that the AttestionData is for the current slot (otherwise it will not be valid) + if slot_requested > state.slot.as_u64() { + let log_clone = self.log.clone(); + let f = sink + .fail(RpcStatus::new( + RpcStatusCode::OutOfRange, + Some(format!( + "AttestationData request for a slot that is in the future." + )), + )) + .map_err(move |e| { + error!(log_clone, "Failed to reply with failure {:?}: {:?}", req, e) + }); + return ctx.spawn(f); + } + // currently cannot handle past slots. TODO: Handle this case + else if slot_requested < state.slot.as_u64() { + let log_clone = self.log.clone(); + let f = sink + .fail(RpcStatus::new( + RpcStatusCode::InvalidArgument, + Some(format!( + "AttestationData request for a slot that is in the past." + )), + )) + .map_err(move |e| { + error!(log_clone, "Failed to reply with failure {:?}: {:?}", req, e) + }); + return ctx.spawn(f); + } + } + + // Then get the AttestationData from the beacon chain + let shard = req.get_shard(); + let attestation_data = match self.chain.produce_attestation_data(shard) { + Ok(v) => v, + Err(e) => { + // Could not produce an attestation + let log_clone = self.log.clone(); + let f = sink + .fail(RpcStatus::new( + RpcStatusCode::Unknown, + Some(format!("Could not produce an attestation: {:?}", e)), + )) + .map_err(move |e| warn!(log_clone, "failed to reply {:?}: {:?}", req, e)); + return ctx.spawn(f); + } + }; + + let mut attestation_data_proto = AttestationDataProto::new(); + attestation_data_proto.set_ssz(ssz_encode(&attestation_data)); + + let mut resp = ProduceAttestationDataResponse::new(); + resp.set_attestation_data(attestation_data_proto); + + let error_log = self.log.clone(); + let f = sink + .success(resp) + .map_err(move |e| error!(error_log, "Failed to reply with success {:?}: {:?}", req, e)); + ctx.spawn(f) + } + + /// Accept some fully-formed `FreeAttestation` from the validator, + /// store it, and aggregate it into an `Attestation`. + fn publish_attestation( + &mut self, + ctx: RpcContext, + req: PublishAttestationRequest, + sink: UnarySink, + ) { + trace!(self.log, "Publishing attestation"); + + let mut resp = PublishAttestationResponse::new(); + let ssz_serialized_attestation = req.get_attestation().get_ssz(); + + let attestation = match Attestation::ssz_decode(ssz_serialized_attestation, 0) { + Ok((v, _index)) => v, + Err(_) => { + let log_clone = self.log.clone(); + let f = sink + .fail(RpcStatus::new( + RpcStatusCode::InvalidArgument, + Some("Invalid attestation".to_string()), + )) + .map_err(move |_| warn!(log_clone, "failed to reply {:?}", req)); + return ctx.spawn(f); + } + }; + + match self.chain.process_attestation(attestation) { + Ok(_) => { + // Attestation was successfully processed. + info!( + self.log, + "PublishAttestation"; + "type" => "valid_attestation", + ); + + resp.set_success(true); + } + Err(e) => { + // Attestation was invalid + warn!( + self.log, + "PublishAttestation"; + "type" => "invalid_attestation", + "error" => format!("{:?}", e), + ); + resp.set_success(false); + resp.set_msg(format!("InvalidAttestation: {:?}", e).as_bytes().to_vec()); + } + }; + + let error_log = self.log.clone(); + let f = sink + .success(resp) + .map_err(move |e| error!(error_log, "failed to reply {:?}: {:?}", req, e)); + ctx.spawn(f) + } +} diff --git a/beacon_node/rpc/src/beacon_attester.rs b/beacon_node/rpc/src/beacon_attester.rs deleted file mode 100644 index 36b6a40b24..0000000000 --- a/beacon_node/rpc/src/beacon_attester.rs +++ /dev/null @@ -1,61 +0,0 @@ -use futures::Future; -use grpcio::{RpcContext, UnarySink}; -use protos::services::{ - Attestation as AttestationProto, ProduceAttestation, ProduceAttestationResponse, - ProduceAttestationRequest, PublishAttestationResponse, PublishAttestationRequest, - PublishAttestation -}; -use protos::services_grpc::BeaconBlockService; -use slog::Logger; - -#[derive(Clone)] -pub struct AttestationServiceInstance { - pub log: Logger, -} - -impl AttestationService for AttestationServiceInstance { - /// Produce a `BeaconBlock` for signing by a validator. - fn produce_attestation( - &mut self, - ctx: RpcContext, - req: ProduceAttestationRequest, - sink: UnarySink, - ) { - println!("producing attestation at slot {}", req.get_slot()); - - // TODO: build a legit block. - let mut attestation = AttestationProto::new(); - attestation.set_slot(req.get_slot()); - // TODO Set the shard to something legit. - attestation.set_shard(0); - attestation.set_block_root(b"cats".to_vec()); - - let mut resp = ProduceAttestationResponse::new(); - resp.set_attestation_data(attestation); - - let f = sink - .success(resp) - .map_err(move |e| println!("failed to reply {:?}: {:?}", req, e)); - ctx.spawn(f) - } - - /// Accept some fully-formed `BeaconBlock`, process and publish it. - fn publish_attestation( - &mut self, - ctx: RpcContext, - req: PublishAttestationRequest, - sink: UnarySink, - ) { - println!("publishing attestation {:?}", req.get_block()); - - // TODO: actually process the block. - let mut resp = PublishAttestationResponse::new(); - - resp.set_success(true); - - let f = sink - .success(resp) - .map_err(move |e| println!("failed to reply {:?}: {:?}", req, e)); - ctx.spawn(f) - } -} diff --git a/beacon_node/rpc/src/beacon_block.rs b/beacon_node/rpc/src/beacon_block.rs index f6b426c18f..0b90f774a5 100644 --- a/beacon_node/rpc/src/beacon_block.rs +++ b/beacon_node/rpc/src/beacon_block.rs @@ -1,9 +1,8 @@ use crate::beacon_chain::BeaconChain; use crossbeam_channel; -use eth2_libp2p::rpc::methods::BlockRootSlot; use eth2_libp2p::PubsubMessage; use futures::Future; -use grpcio::{RpcContext, UnarySink}; +use grpcio::{RpcContext, RpcStatus, RpcStatusCode, UnarySink}; use network::NetworkMessage; use protos::services::{ BeaconBlock as BeaconBlockProto, ProduceBeaconBlockRequest, ProduceBeaconBlockResponse, @@ -11,10 +10,10 @@ use protos::services::{ }; use protos::services_grpc::BeaconBlockService; use slog::Logger; -use slog::{debug, error, info, warn}; -use ssz::{Decodable, TreeHash}; +use slog::{error, info, trace, warn}; +use ssz::{ssz_encode, Decodable}; use std::sync::Arc; -use types::{BeaconBlock, Hash256, Slot}; +use types::{BeaconBlock, Signature, Slot}; #[derive(Clone)] pub struct BeaconBlockServiceInstance { @@ -31,11 +30,44 @@ impl BeaconBlockService for BeaconBlockServiceInstance { req: ProduceBeaconBlockRequest, sink: UnarySink, ) { - println!("producing at slot {}", req.get_slot()); + trace!(self.log, "Generating a beacon block"; "req" => format!("{:?}", req)); + + // decode the request + // TODO: requested slot currently unused, see: https://github.com/sigp/lighthouse/issues/336 + let _requested_slot = Slot::from(req.get_slot()); + let randao_reveal = match Signature::ssz_decode(req.get_randao_reveal(), 0) { + Ok((reveal, _index)) => reveal, + Err(_) => { + // decode error, incorrect signature + let log_clone = self.log.clone(); + let f = sink + .fail(RpcStatus::new( + RpcStatusCode::InvalidArgument, + Some(format!("Invalid randao reveal signature")), + )) + .map_err(move |e| warn!(log_clone, "failed to reply {:?}: {:?}", req, e)); + return ctx.spawn(f); + } + }; + + let produced_block = match self.chain.produce_block(randao_reveal) { + Ok((block, _state)) => block, + Err(e) => { + // could not produce a block + let log_clone = self.log.clone(); + warn!(self.log, "RPC Error"; "Error" => format!("Could not produce a block:{:?}",e)); + let f = sink + .fail(RpcStatus::new( + RpcStatusCode::Unknown, + Some(format!("Could not produce a block: {:?}", e)), + )) + .map_err(move |e| warn!(log_clone, "failed to reply {:?}: {:?}", req, e)); + return ctx.spawn(f); + } + }; - // TODO: build a legit block. let mut block = BeaconBlockProto::new(); - block.set_ssz(b"cats".to_vec()); + block.set_ssz(ssz_encode(&produced_block)); let mut resp = ProduceBeaconBlockResponse::new(); resp.set_block(block); @@ -53,14 +85,14 @@ impl BeaconBlockService for BeaconBlockServiceInstance { req: PublishBeaconBlockRequest, sink: UnarySink, ) { + trace!(&self.log, "Attempting to publish a block"); + let mut resp = PublishBeaconBlockResponse::new(); let ssz_serialized_block = req.get_block().get_ssz(); match BeaconBlock::ssz_decode(ssz_serialized_block, 0) { Ok((block, _i)) => { - let block_root = Hash256::from_slice(&block.hash_tree_root()[..]); - match self.chain.process_block(block.clone()) { Ok(outcome) => { if outcome.sucessfully_processed() { @@ -76,16 +108,22 @@ impl BeaconBlockService for BeaconBlockServiceInstance { // TODO: Obtain topics from the network service properly. let topic = types::TopicBuilder::new("beacon_chain".to_string()).build(); - let message = PubsubMessage::Block(BlockRootSlot { - block_root, - slot: block.slot, - }); + let message = PubsubMessage::Block(block); - println!("Sending beacon block to gossipsub"); - self.network_chan.send(NetworkMessage::Publish { - topics: vec![topic], - message, - }); + // Publish the block to the p2p network via gossipsub. + self.network_chan + .send(NetworkMessage::Publish { + topics: vec![topic], + message, + }) + .unwrap_or_else(|e| { + error!( + self.log, + "PublishBeaconBlock"; + "type" => "failed to publish to gossipsub", + "error" => format!("{:?}", e) + ); + }); resp.set_success(true); } else if outcome.is_invalid() { diff --git a/beacon_node/rpc/src/beacon_chain.rs b/beacon_node/rpc/src/beacon_chain.rs index 0551a80246..ddc91b73cc 100644 --- a/beacon_node/rpc/src/beacon_chain.rs +++ b/beacon_node/rpc/src/beacon_chain.rs @@ -2,12 +2,13 @@ use beacon_chain::BeaconChain as RawBeaconChain; use beacon_chain::{ db::ClientDB, fork_choice::ForkChoice, - parking_lot::RwLockReadGuard, + parking_lot::{RwLockReadGuard, RwLockWriteGuard}, slot_clock::SlotClock, - types::{BeaconState, ChainSpec}, + types::{BeaconState, ChainSpec, Signature}, + AttestationValidationError, BlockProductionError, }; pub use beacon_chain::{BeaconChainError, BlockProcessingOutcome}; -use types::BeaconBlock; +use types::{Attestation, AttestationData, BeaconBlock}; /// The RPC's API to the beacon chain. pub trait BeaconChain: Send + Sync { @@ -15,8 +16,22 @@ pub trait BeaconChain: Send + Sync { fn get_state(&self) -> RwLockReadGuard; + fn get_mut_state(&self) -> RwLockWriteGuard; + fn process_block(&self, block: BeaconBlock) -> Result; + + fn produce_block( + &self, + randao_reveal: Signature, + ) -> Result<(BeaconBlock, BeaconState), BlockProductionError>; + + fn produce_attestation_data(&self, shard: u64) -> Result; + + fn process_attestation( + &self, + attestation: Attestation, + ) -> Result<(), AttestationValidationError>; } impl BeaconChain for RawBeaconChain @@ -33,10 +48,32 @@ where self.state.read() } + fn get_mut_state(&self) -> RwLockWriteGuard { + self.state.write() + } + fn process_block( &self, block: BeaconBlock, ) -> Result { self.process_block(block) } + + fn produce_block( + &self, + randao_reveal: Signature, + ) -> Result<(BeaconBlock, BeaconState), BlockProductionError> { + self.produce_block(randao_reveal) + } + + fn produce_attestation_data(&self, shard: u64) -> Result { + self.produce_attestation_data(shard) + } + + fn process_attestation( + &self, + attestation: Attestation, + ) -> Result<(), AttestationValidationError> { + self.process_attestation(attestation) + } } diff --git a/beacon_node/rpc/src/lib.rs b/beacon_node/rpc/src/lib.rs index 2d47b4a69a..5aac4ce558 100644 --- a/beacon_node/rpc/src/lib.rs +++ b/beacon_node/rpc/src/lib.rs @@ -1,19 +1,22 @@ +mod attestation; mod beacon_block; pub mod beacon_chain; mod beacon_node; pub mod config; mod validator; +use self::attestation::AttestationServiceInstance; use self::beacon_block::BeaconBlockServiceInstance; use self::beacon_chain::BeaconChain; use self::beacon_node::BeaconNodeServiceInstance; use self::validator::ValidatorServiceInstance; pub use config::Config as RPCConfig; -use futures::{future, Future}; -use grpcio::{Environment, Server, ServerBuilder}; +use futures::Future; +use grpcio::{Environment, ServerBuilder}; use network::NetworkMessage; use protos::services_grpc::{ - create_beacon_block_service, create_beacon_node_service, create_validator_service, + create_attestation_service, create_beacon_block_service, create_beacon_node_service, + create_validator_service, }; use slog::{info, o, warn}; use std::sync::Arc; @@ -56,11 +59,19 @@ pub fn start_server( }; create_validator_service(instance) }; + let attestation_service = { + let instance = AttestationServiceInstance { + chain: beacon_chain.clone(), + log: log.clone(), + }; + create_attestation_service(instance) + }; let mut server = ServerBuilder::new(env) .register_service(beacon_block_service) .register_service(validator_service) .register_service(beacon_node_service) + .register_service(attestation_service) .bind(config.listen_address.to_string(), config.port) .build() .unwrap(); diff --git a/beacon_node/rpc/src/validator.rs b/beacon_node/rpc/src/validator.rs index 29ea83f4b8..4bef1e2e63 100644 --- a/beacon_node/rpc/src/validator.rs +++ b/beacon_node/rpc/src/validator.rs @@ -4,7 +4,7 @@ use futures::Future; use grpcio::{RpcContext, RpcStatus, RpcStatusCode, UnarySink}; use protos::services::{ActiveValidator, GetDutiesRequest, GetDutiesResponse, ValidatorDuty}; use protos::services_grpc::ValidatorService; -use slog::{debug, info, warn, Logger}; +use slog::{trace, warn}; use ssz::decode; use std::sync::Arc; use types::{Epoch, RelativeEpoch}; @@ -12,7 +12,7 @@ use types::{Epoch, RelativeEpoch}; #[derive(Clone)] pub struct ValidatorServiceInstance { pub chain: Arc, - pub log: Logger, + pub log: slog::Logger, } //TODO: Refactor Errors @@ -27,14 +27,13 @@ impl ValidatorService for ValidatorServiceInstance { sink: UnarySink, ) { let validators = req.get_validators(); - debug!(self.log, "RPC request"; "endpoint" => "GetValidatorDuties", "epoch" => req.get_epoch()); - - let epoch = Epoch::from(req.get_epoch()); - let mut resp = GetDutiesResponse::new(); - let resp_validators = resp.mut_active_validators(); + trace!(self.log, "RPC request"; "endpoint" => "GetValidatorDuties", "epoch" => req.get_epoch()); let spec = self.chain.get_spec(); let state = self.chain.get_state(); + let epoch = Epoch::from(req.get_epoch()); + let mut resp = GetDutiesResponse::new(); + let resp_validators = resp.mut_active_validators(); let relative_epoch = match RelativeEpoch::from_epoch(state.slot.epoch(spec.slots_per_epoch), epoch) { @@ -84,7 +83,7 @@ impl ValidatorService for ValidatorServiceInstance { RpcStatusCode::InvalidArgument, Some("Invalid public_key".to_string()), )) - .map_err(move |e| warn!(log_clone, "failed to reply {:?}", req)); + .map_err(move |_| warn!(log_clone, "failed to reply {:?}", req)); return ctx.spawn(f); } }; @@ -157,6 +156,7 @@ impl ValidatorService for ValidatorServiceInstance { duty.set_committee_index(attestation_duties.committee_index as u64); duty.set_attestation_slot(attestation_duties.slot.as_u64()); duty.set_attestation_shard(attestation_duties.shard); + duty.set_committee_len(attestation_duties.committee_len as u64); active_validator.set_duty(duty); resp_validators.push(active_validator); diff --git a/beacon_node/src/main.rs b/beacon_node/src/main.rs index ea74c73766..45aafb3ce5 100644 --- a/beacon_node/src/main.rs +++ b/beacon_node/src/main.rs @@ -16,6 +16,7 @@ fn main() { .version(version::version().as_str()) .author("Sigma Prime ") .about("Eth 2.0 Client") + // file system related arguments .arg( Arg::with_name("datadir") .long("datadir") @@ -23,8 +24,9 @@ fn main() { .help("Data directory for keys and databases.") .takes_value(true), ) + // network related arguments .arg( - Arg::with_name("listen_address") + Arg::with_name("listen-address") .long("listen-address") .value_name("Listen Address") .help("The Network address to listen for p2p connections.") @@ -37,6 +39,14 @@ fn main() { .help("Network listen port for p2p connections.") .takes_value(true), ) + .arg( + Arg::with_name("boot-nodes") + .long("boot-nodes") + .value_name("BOOTNODES") + .help("A list of comma separated multi addresses representing bootnodes to connect to.") + .takes_value(true), + ) + // rpc related arguments .arg( Arg::with_name("rpc") .long("rpc") diff --git a/eth2/attester/src/lib.rs b/eth2/attester/src/lib.rs index 270c1e4d79..a4295f0052 100644 --- a/eth2/attester/src/lib.rs +++ b/eth2/attester/src/lib.rs @@ -94,7 +94,7 @@ impl Attester Result { - let attestation_data = match self.beacon_node.produce_attestation(slot, shard)? { + let attestation_data = match self.beacon_node.produce_attestation_data(slot, shard)? { Some(attestation_data) => attestation_data, None => return Ok(PollOutcome::BeaconNodeUnableToProduceAttestation(slot)), }; diff --git a/eth2/attester/src/test_utils/simulated_beacon_node.rs b/eth2/attester/src/test_utils/simulated_beacon_node.rs index 84a203cdb5..d19f434223 100644 --- a/eth2/attester/src/test_utils/simulated_beacon_node.rs +++ b/eth2/attester/src/test_utils/simulated_beacon_node.rs @@ -26,7 +26,7 @@ impl SimulatedBeaconNode { } impl BeaconNode for SimulatedBeaconNode { - fn produce_attestation(&self, slot: Slot, shard: u64) -> ProduceResult { + fn produce_attestation_data(&self, slot: Slot, shard: u64) -> ProduceResult { *self.produce_input.write().unwrap() = Some((slot, shard)); match *self.produce_result.read().unwrap() { Some(ref r) => r.clone(), diff --git a/eth2/attester/src/traits.rs b/eth2/attester/src/traits.rs index 749c6e1a2a..2fd6940af2 100644 --- a/eth2/attester/src/traits.rs +++ b/eth2/attester/src/traits.rs @@ -14,7 +14,7 @@ pub enum PublishOutcome { /// Defines the methods required to produce and publish blocks on a Beacon Node. pub trait BeaconNode: Send + Sync { - fn produce_attestation( + fn produce_attestation_data( &self, slot: Slot, shard: u64, diff --git a/eth2/operation_pool/src/lib.rs b/eth2/operation_pool/src/lib.rs index 0c5d78fe49..41c41c3ff7 100644 --- a/eth2/operation_pool/src/lib.rs +++ b/eth2/operation_pool/src/lib.rs @@ -183,6 +183,8 @@ impl OperationPool { || key.domain_bytes_match(&curr_domain_bytes) }) .flat_map(|(_, attestations)| attestations) + // That are not superseded by an attestation included in the state... + .filter(|attestation| !superior_attestation_exists_in_state(state, attestation)) // That are valid... .filter(|attestation| validate_attestation(state, attestation, spec).is_ok()) // Scored by the number of new attestations they introduce (descending) @@ -475,6 +477,31 @@ impl OperationPool { } } +/// Returns `true` if the state already contains a `PendingAttestation` that is superior to the +/// given `attestation`. +/// +/// A validator has nothing to gain from re-including an attestation and it adds load to the +/// network. +/// +/// An existing `PendingAttestation` is superior to an existing `attestation` if: +/// +/// - Their `AttestationData` is equal. +/// - `attestation` does not contain any signatures that `PendingAttestation` does not have. +fn superior_attestation_exists_in_state(state: &BeaconState, attestation: &Attestation) -> bool { + state + .current_epoch_attestations + .iter() + .chain(state.previous_epoch_attestations.iter()) + .any(|existing_attestation| { + let bitfield = &attestation.aggregation_bitfield; + let existing_bitfield = &existing_attestation.aggregation_bitfield; + + existing_attestation.data == attestation.data + && bitfield.intersection(existing_bitfield).num_set_bits() + == bitfield.num_set_bits() + }) +} + /// Filter up to a maximum number of operations out of an iterator. fn filter_limit_operations<'a, T: 'a, I, F>(operations: I, filter: F, limit: u64) -> Vec where diff --git a/eth2/state_processing/src/per_epoch_processing/validator_statuses.rs b/eth2/state_processing/src/per_epoch_processing/validator_statuses.rs index 50f3ec3727..02149cc5a1 100644 --- a/eth2/state_processing/src/per_epoch_processing/validator_statuses.rs +++ b/eth2/state_processing/src/per_epoch_processing/validator_statuses.rs @@ -227,7 +227,7 @@ impl ValidatorStatuses { status.is_previous_epoch_attester = true; // The inclusion slot and distance are only required for previous epoch attesters. - let relative_epoch = RelativeEpoch::from_slot(state.slot, a.data.slot, spec)?; + let relative_epoch = RelativeEpoch::from_slot(state.slot, a.inclusion_slot, spec)?; status.inclusion_info = Some(InclusionInfo { slot: a.inclusion_slot, distance: inclusion_distance(a), diff --git a/eth2/types/src/beacon_state.rs b/eth2/types/src/beacon_state.rs index 1e52781243..774e8eb761 100644 --- a/eth2/types/src/beacon_state.rs +++ b/eth2/types/src/beacon_state.rs @@ -661,6 +661,17 @@ impl BeaconState { }) } + /// Build all the caches, if they need to be built. + pub fn build_all_caches(&mut self, spec: &ChainSpec) -> Result<(), Error> { + self.build_epoch_cache(RelativeEpoch::Previous, spec)?; + self.build_epoch_cache(RelativeEpoch::Current, spec)?; + self.build_epoch_cache(RelativeEpoch::NextWithoutRegistryChange, spec)?; + self.build_epoch_cache(RelativeEpoch::NextWithRegistryChange, spec)?; + self.update_pubkey_cache()?; + + Ok(()) + } + /// Build an epoch cache, unless it is has already been built. pub fn build_epoch_cache( &mut self, diff --git a/eth2/types/src/slot_epoch.rs b/eth2/types/src/slot_epoch.rs index f0dac5ac0c..d334177e58 100644 --- a/eth2/types/src/slot_epoch.rs +++ b/eth2/types/src/slot_epoch.rs @@ -113,6 +113,16 @@ mod epoch_tests { all_tests!(Epoch); + #[test] + fn epoch_start_end() { + let slots_per_epoch = 8; + + let epoch = Epoch::new(0); + + assert_eq!(epoch.start_slot(slots_per_epoch), Slot::new(0)); + assert_eq!(epoch.end_slot(slots_per_epoch), Slot::new(7)); + } + #[test] fn slot_iter() { let slots_per_epoch = 8; diff --git a/eth2/types/src/test_utils/testing_beacon_state_builder.rs b/eth2/types/src/test_utils/testing_beacon_state_builder.rs index b38e8b5273..445debae7c 100644 --- a/eth2/types/src/test_utils/testing_beacon_state_builder.rs +++ b/eth2/types/src/test_utils/testing_beacon_state_builder.rs @@ -6,6 +6,8 @@ use dirs; use log::debug; use rayon::prelude::*; use std::path::{Path, PathBuf}; +//TODO: testing only +use std::time::{Duration, SystemTime}; pub const KEYPAIRS_FILE: &str = "keypairs.raw_keypairs"; @@ -120,7 +122,17 @@ impl TestingBeaconStateBuilder { }) .collect(); - let genesis_time = 1553753928; // arbitrary + // TODO: Testing only. Burn with fire later. + // set genesis to the last 30 minute block. + // this is used for testing only. Allows multiple nodes to connect within a 30min window + // and agree on a genesis + let now = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap() + .as_secs(); + let secs_after_last_period = now.checked_rem(30 * 60).unwrap_or(0); + // genesis is now the last 30 minute block. + let genesis_time = now - secs_after_last_period; let mut state = BeaconState::genesis( genesis_time, diff --git a/eth2/utils/bls/src/keypair.rs b/eth2/utils/bls/src/keypair.rs index 6feb2a5856..2f0e794a61 100644 --- a/eth2/utils/bls/src/keypair.rs +++ b/eth2/utils/bls/src/keypair.rs @@ -1,5 +1,7 @@ use super::{PublicKey, SecretKey}; use serde_derive::{Deserialize, Serialize}; +use std::fmt; +use std::hash::{Hash, Hasher}; #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct Keypair { @@ -19,3 +21,21 @@ impl Keypair { self.pk.concatenated_hex_id() } } + +impl Hash for Keypair { + /// Note: this is distinct from consensus serialization, it will produce a different hash. + /// + /// This method uses the uncompressed bytes, which are much faster to obtain than the + /// compressed bytes required for consensus serialization. + /// + /// Use `ssz::Encode` to obtain the bytes required for consensus hashing. + fn hash(&self, state: &mut H) { + self.pk.as_uncompressed_bytes().hash(state) + } +} + +impl fmt::Display for Keypair { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "{}", self.pk) + } +} diff --git a/eth2/utils/bls/src/public_key.rs b/eth2/utils/bls/src/public_key.rs index 98ff40d71b..177a735c42 100644 --- a/eth2/utils/bls/src/public_key.rs +++ b/eth2/utils/bls/src/public_key.rs @@ -5,6 +5,7 @@ use serde::ser::{Serialize, Serializer}; use serde_hex::{encode as hex_encode, HexVisitor}; use ssz::{decode, hash, ssz_encode, Decodable, DecodeError, Encodable, SszStream, TreeHash}; use std::default; +use std::fmt; use std::hash::{Hash, Hasher}; /// A single BLS signature. @@ -52,6 +53,12 @@ impl PublicKey { } } +impl fmt::Display for PublicKey { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "{}", self.concatenated_hex_id()) + } +} + impl default::Default for PublicKey { fn default() -> Self { let secret_key = SecretKey::random(); diff --git a/protos/src/services.proto b/protos/src/services.proto index dd82855a19..ecc75ee264 100644 --- a/protos/src/services.proto +++ b/protos/src/services.proto @@ -19,7 +19,9 @@ service BeaconNodeService { /// Service that handles block production service BeaconBlockService { + // Requests a block to be signed from the beacon node. rpc ProduceBeaconBlock(ProduceBeaconBlockRequest) returns (ProduceBeaconBlockResponse); + // Responds to the node the signed block to be published. rpc PublishBeaconBlock(PublishBeaconBlockRequest) returns (PublishBeaconBlockResponse); } @@ -33,7 +35,7 @@ service ValidatorService { /// Service that handles validator attestations service AttestationService { - rpc ProduceAttestation(ProduceAttestationRequest) returns (ProduceAttestationResponse); + rpc ProduceAttestationData(ProduceAttestationDataRequest) returns (ProduceAttestationDataResponse); rpc PublishAttestation(PublishAttestationRequest) returns (PublishAttestationResponse); } @@ -64,6 +66,7 @@ message Empty {} // Validator requests an unsigned proposal. message ProduceBeaconBlockRequest { uint64 slot = 1; + bytes randao_reveal = 2; } // Beacon node returns an unsigned proposal. @@ -122,23 +125,28 @@ message ValidatorDuty { uint64 attestation_slot = 3; uint64 attestation_shard = 4; uint64 committee_index = 5; + uint64 committee_len = 6; } /* * Attestation Service Messages */ -message ProduceAttestationRequest { +message ProduceAttestationDataRequest { uint64 slot = 1; uint64 shard = 2; } -message ProduceAttestationResponse { - Attestation attestation_data = 1; +message ProduceAttestationDataResponse { + AttestationData attestation_data = 1; } message PublishAttestationRequest { - FreeAttestation free_attestation = 1; + Attestation attestation = 1; +} + +message Attestation { + bytes ssz = 1; } message PublishAttestationResponse { @@ -146,26 +154,6 @@ message PublishAttestationResponse { bytes msg = 2; } -message Crosslink { - uint64 epoch = 1; - bytes crosslink_data_root = 2; - -} - -message Attestation { - uint64 slot = 1; - uint64 shard = 2; - bytes beacon_block_root = 3; - bytes epoch_boundary_root = 4; - bytes crosslink_data_root = 5; - Crosslink latest_crosslink = 6; - uint64 justified_epoch = 7; - bytes justified_block_root = 8; - -} - -message FreeAttestation { - Attestation attestation_data = 1; - bytes signature = 2; - uint64 validator_index = 3; +message AttestationData { + bytes ssz = 1; } diff --git a/validator_client/Cargo.toml b/validator_client/Cargo.toml index 570e06d74c..80477c8eaa 100644 --- a/validator_client/Cargo.toml +++ b/validator_client/Cargo.toml @@ -12,7 +12,6 @@ path = "src/main.rs" name = "validator_client" path = "src/lib.rs" - [dependencies] block_proposer = { path = "../eth2/block_proposer" } attester = { path = "../eth2/attester" } diff --git a/validator_client/src/attestation_producer/beacon_node_attestation.rs b/validator_client/src/attestation_producer/beacon_node_attestation.rs new file mode 100644 index 0000000000..b5ff777de8 --- /dev/null +++ b/validator_client/src/attestation_producer/beacon_node_attestation.rs @@ -0,0 +1,23 @@ +//TODO: generalise these enums to the crate +use crate::block_producer::{BeaconNodeError, PublishOutcome}; +use types::{Attestation, AttestationData, Slot}; + +/// Defines the methods required to produce and publish attestations on a Beacon Node. Abstracts the +/// actual beacon node. +pub trait BeaconNodeAttestation: Send + Sync { + /// Request that the node produces the required attestation data. + /// + fn produce_attestation_data( + &self, + slot: Slot, + shard: u64, + ) -> Result; + + /// Request that the node publishes a attestation. + /// + /// Returns `true` if the publish was successful. + fn publish_attestation( + &self, + attestation: Attestation, + ) -> Result; +} diff --git a/validator_client/src/attestation_producer/grpc.rs b/validator_client/src/attestation_producer/grpc.rs new file mode 100644 index 0000000000..900a92f321 --- /dev/null +++ b/validator_client/src/attestation_producer/grpc.rs @@ -0,0 +1,57 @@ +use super::beacon_node_attestation::BeaconNodeAttestation; +use crate::block_producer::{BeaconNodeError, PublishOutcome}; +use protos::services_grpc::AttestationServiceClient; +use ssz::{ssz_encode, Decodable}; + +use protos::services::{ + Attestation as GrpcAttestation, ProduceAttestationDataRequest, PublishAttestationRequest, +}; +use types::{Attestation, AttestationData, Slot}; + +impl BeaconNodeAttestation for AttestationServiceClient { + fn produce_attestation_data( + &self, + slot: Slot, + shard: u64, + ) -> Result { + let mut req = ProduceAttestationDataRequest::new(); + req.set_slot(slot.as_u64()); + req.set_shard(shard); + + let reply = self + .produce_attestation_data(&req) + .map_err(|err| BeaconNodeError::RemoteFailure(format!("{:?}", err)))?; + + let (attestation_data, _index) = + AttestationData::ssz_decode(reply.get_attestation_data().get_ssz(), 0) + .map_err(|_| BeaconNodeError::DecodeFailure)?; + Ok(attestation_data) + } + + fn publish_attestation( + &self, + attestation: Attestation, + ) -> Result { + let mut req = PublishAttestationRequest::new(); + + let ssz = ssz_encode(&attestation); + + let mut grpc_attestation = GrpcAttestation::new(); + grpc_attestation.set_ssz(ssz); + + req.set_attestation(grpc_attestation); + + let reply = self + .publish_attestation(&req) + .map_err(|err| BeaconNodeError::RemoteFailure(format!("{:?}", err)))?; + + if reply.get_success() { + Ok(PublishOutcome::Valid) + } else { + // TODO: distinguish between different errors + Ok(PublishOutcome::InvalidAttestation( + "Publish failed".to_string(), + )) + } + } +} diff --git a/validator_client/src/attestation_producer/mod.rs b/validator_client/src/attestation_producer/mod.rs new file mode 100644 index 0000000000..0fbc7bcbaa --- /dev/null +++ b/validator_client/src/attestation_producer/mod.rs @@ -0,0 +1,165 @@ +mod beacon_node_attestation; +mod grpc; + +use std::sync::Arc; +use types::{ChainSpec, Domain, Fork}; +//TODO: Move these higher up in the crate +use super::block_producer::{BeaconNodeError, PublishOutcome, ValidatorEvent}; +use crate::signer::Signer; +use beacon_node_attestation::BeaconNodeAttestation; +use slog::{error, info, warn}; +use ssz::TreeHash; +use types::{ + AggregateSignature, Attestation, AttestationData, AttestationDataAndCustodyBit, + AttestationDuty, Bitfield, +}; + +//TODO: Group these errors at a crate level +#[derive(Debug, PartialEq)] +pub enum Error { + BeaconNodeError(BeaconNodeError), +} + +impl From for Error { + fn from(e: BeaconNodeError) -> Error { + Error::BeaconNodeError(e) + } +} + +/// This struct contains the logic for requesting and signing beacon attestations for a validator. The +/// validator can abstractly sign via the Signer trait object. +pub struct AttestationProducer<'a, B: BeaconNodeAttestation, S: Signer> { + /// The current fork. + pub fork: Fork, + /// The attestation duty to perform. + pub duty: AttestationDuty, + /// The current epoch. + pub spec: Arc, + /// The beacon node to connect to. + pub beacon_node: Arc, + /// The signer to sign the block. + pub signer: &'a S, +} + +impl<'a, B: BeaconNodeAttestation, S: Signer> AttestationProducer<'a, B, S> { + /// Handle outputs and results from attestation production. + pub fn handle_produce_attestation(&mut self, log: slog::Logger) { + match self.produce_attestation() { + Ok(ValidatorEvent::AttestationProduced(_slot)) => { + info!(log, "Attestation produced"; "Validator" => format!("{}", self.signer)) + } + Err(e) => error!(log, "Attestation production error"; "Error" => format!("{:?}", e)), + Ok(ValidatorEvent::SignerRejection(_slot)) => { + error!(log, "Attestation production error"; "Error" => "Signer could not sign the attestation".to_string()) + } + Ok(ValidatorEvent::SlashableAttestationNotProduced(_slot)) => { + error!(log, "Attestation production error"; "Error" => "Rejected the attestation as it could have been slashed".to_string()) + } + Ok(ValidatorEvent::PublishAttestationFailed) => { + error!(log, "Attestation production error"; "Error" => "Beacon node was unable to publish an attestation".to_string()) + } + Ok(ValidatorEvent::InvalidAttestation) => { + error!(log, "Attestation production error"; "Error" => "The signed attestation was invalid".to_string()) + } + Ok(v) => { + warn!(log, "Unknown result for attestation production"; "Error" => format!("{:?}",v)) + } + } + } + + /// Produce an attestation, sign it and send it back + /// + /// Assumes that an attestation is required at this slot (does not check the duties). + /// + /// Ensures the message is not slashable. + /// + /// !!! UNSAFE !!! + /// + /// The slash-protection code is not yet implemented. There is zero protection against + /// slashing. + pub fn produce_attestation(&mut self) -> Result { + let epoch = self.duty.slot.epoch(self.spec.slots_per_epoch); + + let attestation = self + .beacon_node + .produce_attestation_data(self.duty.slot, self.duty.shard)?; + if self.safe_to_produce(&attestation) { + let domain = self.spec.get_domain(epoch, Domain::Attestation, &self.fork); + if let Some(attestation) = self.sign_attestation(attestation, self.duty, domain) { + match self.beacon_node.publish_attestation(attestation) { + Ok(PublishOutcome::InvalidAttestation(_string)) => { + Ok(ValidatorEvent::InvalidAttestation) + } + Ok(PublishOutcome::Valid) => { + Ok(ValidatorEvent::AttestationProduced(self.duty.slot)) + } + Err(_) | Ok(_) => Ok(ValidatorEvent::PublishAttestationFailed), + } + } else { + Ok(ValidatorEvent::SignerRejection(self.duty.slot)) + } + } else { + Ok(ValidatorEvent::SlashableAttestationNotProduced( + self.duty.slot, + )) + } + } + + /// Consumes an attestation, returning the attestation signed by the validators private key. + /// + /// Important: this function will not check to ensure the attestation is not slashable. This must be + /// done upstream. + fn sign_attestation( + &mut self, + attestation: AttestationData, + duties: AttestationDuty, + domain: u64, + ) -> Option { + self.store_produce(&attestation); + + // build the aggregate signature + let aggregate_signature = { + let message = AttestationDataAndCustodyBit { + data: attestation.clone(), + custody_bit: false, + } + .hash_tree_root(); + + let sig = self.signer.sign_message(&message, domain)?; + + let mut agg_sig = AggregateSignature::new(); + agg_sig.add(&sig); + agg_sig + }; + + let mut aggregation_bitfield = Bitfield::with_capacity(duties.committee_len); + let custody_bitfield = Bitfield::with_capacity(duties.committee_len); + aggregation_bitfield.set(duties.committee_index, true); + + Some(Attestation { + aggregation_bitfield, + data: attestation, + custody_bitfield, + aggregate_signature, + }) + } + + /// Returns `true` if signing an attestation is safe (non-slashable). + /// + /// !!! UNSAFE !!! + /// + /// Important: this function is presently stubbed-out. It provides ZERO SAFETY. + fn safe_to_produce(&self, _attestation: &AttestationData) -> bool { + //TODO: Implement slash protection + true + } + + /// Record that an attestation was produced so that slashable votes may not be made in the future. + /// + /// !!! UNSAFE !!! + /// + /// Important: this function is presently stubbed-out. It provides ZERO SAFETY. + fn store_produce(&mut self, _attestation: &AttestationData) { + // TODO: Implement slash protection + } +} diff --git a/validator_client/src/attester_service/attestation_grpc_client.rs b/validator_client/src/attester_service/attestation_grpc_client.rs deleted file mode 100644 index 5a4701ba93..0000000000 --- a/validator_client/src/attester_service/attestation_grpc_client.rs +++ /dev/null @@ -1,44 +0,0 @@ -use protos::services_grpc::AttestationServiceClient; -use std::sync::Arc; - -use attester::{BeaconNode, BeaconNodeError, PublishOutcome}; -use protos::services::ProduceAttestationRequest; -use types::{AttestationData, FreeAttestation, Slot}; - -pub struct AttestationGrpcClient { - client: Arc, -} - -impl AttestationGrpcClient { - pub fn new(client: Arc) -> Self { - Self { client } - } -} - -impl BeaconNode for AttestationGrpcClient { - fn produce_attestation( - &self, - slot: Slot, - shard: u64, - ) -> Result, BeaconNodeError> { - let mut req = ProduceAttestationRequest::new(); - req.set_slot(slot.as_u64()); - req.set_shard(shard); - - let reply = self - .client - .produce_attestation(&req) - .map_err(|err| BeaconNodeError::RemoteFailure(format!("{:?}", err)))?; - - // TODO: return correct Attestation - Err(BeaconNodeError::DecodeFailure) - } - - fn publish_attestation( - &self, - free_attestation: FreeAttestation, - ) -> Result { - // TODO: return correct PublishOutcome - Err(BeaconNodeError::DecodeFailure) - } -} diff --git a/validator_client/src/attester_service/mod.rs b/validator_client/src/attester_service/mod.rs deleted file mode 100644 index fe5de7647f..0000000000 --- a/validator_client/src/attester_service/mod.rs +++ /dev/null @@ -1,54 +0,0 @@ -mod attestation_grpc_client; -use attester::{Attester, BeaconNode, DutiesReader, PollOutcome as AttesterPollOutcome, Signer}; -use slog::{error, info, warn, Logger}; -use slot_clock::SlotClock; -use std::time::Duration; - -pub use self::attestation_grpc_client::AttestationGrpcClient; - -pub struct AttesterService { - pub attester: Attester, - pub poll_interval_millis: u64, - pub log: Logger, -} - -impl AttesterService { - /// Run a loop which polls the Attester each `poll_interval_millis` millseconds. - /// - /// Logs the results of the polls. - pub fn run(&mut self) { - loop { - match self.attester.poll() { - Err(error) => { - error!(self.log, "Attester poll error"; "error" => format!("{:?}", error)) - } - Ok(AttesterPollOutcome::AttestationProduced(slot)) => { - info!(self.log, "Produced Attestation"; "slot" => slot) - } - Ok(AttesterPollOutcome::SlashableAttestationNotProduced(slot)) => { - warn!(self.log, "Slashable attestation was not produced"; "slot" => slot) - } - Ok(AttesterPollOutcome::AttestationNotRequired(slot)) => { - info!(self.log, "Attestation not required"; "slot" => slot) - } - Ok(AttesterPollOutcome::ProducerDutiesUnknown(slot)) => { - error!(self.log, "Attestation duties unknown"; "slot" => slot) - } - Ok(AttesterPollOutcome::SlotAlreadyProcessed(slot)) => { - warn!(self.log, "Attempted to re-process slot"; "slot" => slot) - } - Ok(AttesterPollOutcome::BeaconNodeUnableToProduceAttestation(slot)) => { - error!(self.log, "Beacon node unable to produce attestation"; "slot" => slot) - } - Ok(AttesterPollOutcome::SignerRejection(slot)) => { - error!(self.log, "The cryptographic signer refused to sign the attestation"; "slot" => slot) - } - Ok(AttesterPollOutcome::ValidatorIsUnknown(slot)) => { - error!(self.log, "The Beacon Node does not recognise the validator"; "slot" => slot) - } - }; - - std::thread::sleep(Duration::from_millis(self.poll_interval_millis)); - } - } -} diff --git a/validator_client/src/block_producer/beacon_node_block.rs b/validator_client/src/block_producer/beacon_node_block.rs new file mode 100644 index 0000000000..65ccb21047 --- /dev/null +++ b/validator_client/src/block_producer/beacon_node_block.rs @@ -0,0 +1,31 @@ +use types::{BeaconBlock, Signature, Slot}; +#[derive(Debug, PartialEq, Clone)] +pub enum BeaconNodeError { + RemoteFailure(String), + DecodeFailure, +} + +#[derive(Debug, PartialEq, Clone)] +pub enum PublishOutcome { + Valid, + InvalidBlock(String), + InvalidAttestation(String), +} + +/// Defines the methods required to produce and publish blocks on a Beacon Node. Abstracts the +/// actual beacon node. +pub trait BeaconNodeBlock: Send + Sync { + /// Request that the node produces a block. + /// + /// Returns Ok(None) if the Beacon Node is unable to produce at the given slot. + fn produce_beacon_block( + &self, + slot: Slot, + randao_reveal: &Signature, + ) -> Result, BeaconNodeError>; + + /// Request that the node publishes a block. + /// + /// Returns `true` if the publish was successful. + fn publish_beacon_block(&self, block: BeaconBlock) -> Result; +} diff --git a/validator_client/src/block_producer_service/beacon_block_grpc_client.rs b/validator_client/src/block_producer/grpc.rs similarity index 85% rename from validator_client/src/block_producer_service/beacon_block_grpc_client.rs rename to validator_client/src/block_producer/grpc.rs index e009a5d975..1c4977bac4 100644 --- a/validator_client/src/block_producer_service/beacon_block_grpc_client.rs +++ b/validator_client/src/block_producer/grpc.rs @@ -1,4 +1,4 @@ -use block_proposer::{BeaconNode, BeaconNodeError, PublishOutcome}; +use super::beacon_node_block::*; use protos::services::{ BeaconBlock as GrpcBeaconBlock, ProduceBeaconBlockRequest, PublishBeaconBlockRequest, }; @@ -7,6 +7,7 @@ use ssz::{decode, ssz_encode}; use std::sync::Arc; use types::{BeaconBlock, Signature, Slot}; +//TODO: Remove this new type. Do not need to wrap /// A newtype designed to wrap the gRPC-generated service so the `BeaconNode` trait may be /// implemented upon it. pub struct BeaconBlockGrpcClient { @@ -19,7 +20,7 @@ impl BeaconBlockGrpcClient { } } -impl BeaconNode for BeaconBlockGrpcClient { +impl BeaconNodeBlock for BeaconBlockGrpcClient { /// Request a Beacon Node (BN) to produce a new block at the supplied slot. /// /// Returns `None` if it is not possible to produce at the supplied slot. For example, if the @@ -27,17 +28,20 @@ impl BeaconNode for BeaconBlockGrpcClient { fn produce_beacon_block( &self, slot: Slot, - // TODO: use randao_reveal, when proto APIs have been updated. - _randao_reveal: &Signature, + randao_reveal: &Signature, ) -> Result, BeaconNodeError> { + // request a beacon block from the node let mut req = ProduceBeaconBlockRequest::new(); req.set_slot(slot.as_u64()); + req.set_randao_reveal(ssz_encode(randao_reveal)); + //TODO: Determine if we want an explicit timeout let reply = self .client .produce_beacon_block(&req) .map_err(|err| BeaconNodeError::RemoteFailure(format!("{:?}", err)))?; + // format the reply if reply.has_block() { let block = reply.get_block(); let ssz = block.get_ssz(); @@ -59,7 +63,6 @@ impl BeaconNode for BeaconBlockGrpcClient { let ssz = ssz_encode(&block); - // TODO: this conversion is incomplete; fix it. let mut grpc_block = GrpcBeaconBlock::new(); grpc_block.set_ssz(ssz); @@ -71,7 +74,7 @@ impl BeaconNode for BeaconBlockGrpcClient { .map_err(|err| BeaconNodeError::RemoteFailure(format!("{:?}", err)))?; if reply.get_success() { - Ok(PublishOutcome::ValidBlock) + Ok(PublishOutcome::Valid) } else { // TODO: distinguish between different errors Ok(PublishOutcome::InvalidBlock("Publish failed".to_string())) diff --git a/validator_client/src/block_producer/mod.rs b/validator_client/src/block_producer/mod.rs new file mode 100644 index 0000000000..8b4f5abda0 --- /dev/null +++ b/validator_client/src/block_producer/mod.rs @@ -0,0 +1,242 @@ +mod beacon_node_block; +mod grpc; + +use self::beacon_node_block::BeaconNodeBlock; +pub use self::beacon_node_block::{BeaconNodeError, PublishOutcome}; +pub use self::grpc::BeaconBlockGrpcClient; +use crate::signer::Signer; +use slog::{error, info, warn}; +use ssz::{SignedRoot, TreeHash}; +use std::sync::Arc; +use types::{BeaconBlock, ChainSpec, Domain, Fork, Slot}; + +#[derive(Debug, PartialEq)] +pub enum Error { + BeaconNodeError(BeaconNodeError), +} + +#[derive(Debug, PartialEq)] +pub enum ValidatorEvent { + /// A new block was produced. + BlockProduced(Slot), + /// A new attestation was produced. + AttestationProduced(Slot), + /// A block was not produced as it would have been slashable. + SlashableBlockNotProduced(Slot), + /// An attestation was not produced as it would have been slashable. + SlashableAttestationNotProduced(Slot), + /// The Beacon Node was unable to produce a block at that slot. + BeaconNodeUnableToProduceBlock(Slot), + /// The signer failed to sign the message. + SignerRejection(Slot), + /// Publishing an attestation failed. + PublishAttestationFailed, + /// Beacon node rejected the attestation. + InvalidAttestation, +} + +/// This struct contains the logic for requesting and signing beacon blocks for a validator. The +/// validator can abstractly sign via the Signer trait object. +pub struct BlockProducer<'a, B: BeaconNodeBlock, S: Signer> { + /// The current fork. + pub fork: Fork, + /// The current slot to produce a block for. + pub slot: Slot, + /// The current epoch. + pub spec: Arc, + /// The beacon node to connect to. + pub beacon_node: Arc, + /// The signer to sign the block. + pub signer: &'a S, +} + +impl<'a, B: BeaconNodeBlock, S: Signer> BlockProducer<'a, B, S> { + /// Handle outputs and results from block production. + pub fn handle_produce_block(&mut self, log: slog::Logger) { + match self.produce_block() { + Ok(ValidatorEvent::BlockProduced(_slot)) => { + info!(log, "Block produced"; "Validator" => format!("{}", self.signer)) + } + Err(e) => error!(log, "Block production error"; "Error" => format!("{:?}", e)), + Ok(ValidatorEvent::SignerRejection(_slot)) => { + error!(log, "Block production error"; "Error" => "Signer Could not sign the block".to_string()) + } + Ok(ValidatorEvent::SlashableBlockNotProduced(_slot)) => { + error!(log, "Block production error"; "Error" => "Rejected the block as it could have been slashed".to_string()) + } + Ok(ValidatorEvent::BeaconNodeUnableToProduceBlock(_slot)) => { + error!(log, "Block production error"; "Error" => "Beacon node was unable to produce a block".to_string()) + } + Ok(v) => { + warn!(log, "Unknown result for block production"; "Error" => format!("{:?}",v)) + } + } + } + + /// Produce a block at some slot. + /// + /// Assumes that a block is required at this slot (does not check the duties). + /// + /// Ensures the message is not slashable. + /// + /// !!! UNSAFE !!! + /// + /// The slash-protection code is not yet implemented. There is zero protection against + /// slashing. + pub fn produce_block(&mut self) -> Result { + let epoch = self.slot.epoch(self.spec.slots_per_epoch); + + let message = epoch.hash_tree_root(); + let randao_reveal = match self.signer.sign_message( + &message, + self.spec.get_domain(epoch, Domain::Randao, &self.fork), + ) { + None => return Ok(ValidatorEvent::SignerRejection(self.slot)), + Some(signature) => signature, + }; + + if let Some(block) = self + .beacon_node + .produce_beacon_block(self.slot, &randao_reveal)? + { + if self.safe_to_produce(&block) { + let domain = self.spec.get_domain(epoch, Domain::BeaconBlock, &self.fork); + if let Some(block) = self.sign_block(block, domain) { + self.beacon_node.publish_beacon_block(block)?; + Ok(ValidatorEvent::BlockProduced(self.slot)) + } else { + Ok(ValidatorEvent::SignerRejection(self.slot)) + } + } else { + Ok(ValidatorEvent::SlashableBlockNotProduced(self.slot)) + } + } else { + Ok(ValidatorEvent::BeaconNodeUnableToProduceBlock(self.slot)) + } + } + + /// Consumes a block, returning that block signed by the validators private key. + /// + /// Important: this function will not check to ensure the block is not slashable. This must be + /// done upstream. + fn sign_block(&mut self, mut block: BeaconBlock, domain: u64) -> Option { + self.store_produce(&block); + + match self.signer.sign_message(&block.signed_root()[..], domain) { + None => None, + Some(signature) => { + block.signature = signature; + Some(block) + } + } + } + + /// Returns `true` if signing a block is safe (non-slashable). + /// + /// !!! UNSAFE !!! + /// + /// Important: this function is presently stubbed-out. It provides ZERO SAFETY. + fn safe_to_produce(&self, _block: &BeaconBlock) -> bool { + // TODO: ensure the producer doesn't produce slashable blocks. + // https://github.com/sigp/lighthouse/issues/160 + true + } + + /// Record that a block was produced so that slashable votes may not be made in the future. + /// + /// !!! UNSAFE !!! + /// + /// Important: this function is presently stubbed-out. It provides ZERO SAFETY. + fn store_produce(&mut self, _block: &BeaconBlock) { + // TODO: record this block production to prevent future slashings. + // https://github.com/sigp/lighthouse/issues/160 + } +} + +impl From for Error { + fn from(e: BeaconNodeError) -> Error { + Error::BeaconNodeError(e) + } +} + +/* Old tests - Re-work for new logic +#[cfg(test)] +mod tests { + use super::test_utils::{EpochMap, LocalSigner, SimulatedBeaconNode}; + use super::*; + use slot_clock::TestingSlotClock; + use types::{ + test_utils::{SeedableRng, TestRandom, XorShiftRng}, + Keypair, + }; + + // TODO: implement more thorough testing. + // https://github.com/sigp/lighthouse/issues/160 + // + // These tests should serve as a good example for future tests. + + #[test] + pub fn polling() { + let mut rng = XorShiftRng::from_seed([42; 16]); + + let spec = Arc::new(ChainSpec::foundation()); + let slot_clock = Arc::new(TestingSlotClock::new(0)); + let beacon_node = Arc::new(SimulatedBeaconNode::default()); + let signer = Arc::new(LocalSigner::new(Keypair::random())); + + let mut epoch_map = EpochMap::new(spec.slots_per_epoch); + let produce_slot = Slot::new(100); + let produce_epoch = produce_slot.epoch(spec.slots_per_epoch); + epoch_map.map.insert(produce_epoch, produce_slot); + let epoch_map = Arc::new(epoch_map); + + let mut block_proposer = BlockProducer::new( + spec.clone(), + epoch_map.clone(), + slot_clock.clone(), + beacon_node.clone(), + signer.clone(), + ); + + // Configure responses from the BeaconNode. + beacon_node.set_next_produce_result(Ok(Some(BeaconBlock::random_for_test(&mut rng)))); + beacon_node.set_next_publish_result(Ok(PublishOutcome::ValidBlock)); + + // One slot before production slot... + slot_clock.set_slot(produce_slot.as_u64() - 1); + assert_eq!( + block_proposer.poll(), + Ok(PollOutcome::BlockProductionNotRequired(produce_slot - 1)) + ); + + // On the produce slot... + slot_clock.set_slot(produce_slot.as_u64()); + assert_eq!( + block_proposer.poll(), + Ok(PollOutcome::BlockProduced(produce_slot.into())) + ); + + // Trying the same produce slot again... + slot_clock.set_slot(produce_slot.as_u64()); + assert_eq!( + block_proposer.poll(), + Ok(PollOutcome::SlotAlreadyProcessed(produce_slot)) + ); + + // One slot after the produce slot... + slot_clock.set_slot(produce_slot.as_u64() + 1); + assert_eq!( + block_proposer.poll(), + Ok(PollOutcome::BlockProductionNotRequired(produce_slot + 1)) + ); + + // In an epoch without known duties... + let slot = (produce_epoch.as_u64() + 1) * spec.slots_per_epoch; + slot_clock.set_slot(slot); + assert_eq!( + block_proposer.poll(), + Ok(PollOutcome::ProducerDutiesUnknown(Slot::new(slot))) + ); + } +} +*/ diff --git a/validator_client/src/block_producer_service/mod.rs b/validator_client/src/block_producer_service/mod.rs deleted file mode 100644 index 91e7606a7f..0000000000 --- a/validator_client/src/block_producer_service/mod.rs +++ /dev/null @@ -1,61 +0,0 @@ -mod beacon_block_grpc_client; -// mod block_producer_service; - -use block_proposer::{ - BeaconNode, BlockProducer, DutiesReader, PollOutcome as BlockProducerPollOutcome, Signer, -}; -use slog::{error, info, warn, Logger}; -use slot_clock::SlotClock; -use std::time::Duration; - -pub use self::beacon_block_grpc_client::BeaconBlockGrpcClient; - -pub struct BlockProducerService { - pub block_producer: BlockProducer, - pub poll_interval_millis: u64, - pub log: Logger, -} - -impl BlockProducerService { - /// Run a loop which polls the block producer each `poll_interval_millis` millseconds. - /// - /// Logs the results of the polls. - pub fn run(&mut self) { - loop { - match self.block_producer.poll() { - Err(error) => { - error!(self.log, "Block producer poll error"; "error" => format!("{:?}", error)) - } - Ok(BlockProducerPollOutcome::BlockProduced(slot)) => { - info!(self.log, "Produced block"; "slot" => slot) - } - Ok(BlockProducerPollOutcome::SlashableBlockNotProduced(slot)) => { - warn!(self.log, "Slashable block was not signed"; "slot" => slot) - } - Ok(BlockProducerPollOutcome::BlockProductionNotRequired(slot)) => { - info!(self.log, "Block production not required"; "slot" => slot) - } - Ok(BlockProducerPollOutcome::ProducerDutiesUnknown(slot)) => { - error!(self.log, "Block production duties unknown"; "slot" => slot) - } - Ok(BlockProducerPollOutcome::SlotAlreadyProcessed(slot)) => { - warn!(self.log, "Attempted to re-process slot"; "slot" => slot) - } - Ok(BlockProducerPollOutcome::BeaconNodeUnableToProduceBlock(slot)) => { - error!(self.log, "Beacon node unable to produce block"; "slot" => slot) - } - Ok(BlockProducerPollOutcome::SignerRejection(slot)) => { - error!(self.log, "The cryptographic signer refused to sign the block"; "slot" => slot) - } - Ok(BlockProducerPollOutcome::ValidatorIsUnknown(slot)) => { - error!(self.log, "The Beacon Node does not recognise the validator"; "slot" => slot) - } - Ok(BlockProducerPollOutcome::UnableToGetFork(slot)) => { - error!(self.log, "Unable to get a `Fork` struct to generate signature domains"; "slot" => slot) - } - }; - - std::thread::sleep(Duration::from_millis(self.poll_interval_millis)); - } - } -} diff --git a/validator_client/src/duties/traits.rs b/validator_client/src/duties/beacon_node_duties.rs similarity index 74% rename from validator_client/src/duties/traits.rs rename to validator_client/src/duties/beacon_node_duties.rs index 374bed9f61..af1fab60bf 100644 --- a/validator_client/src/duties/traits.rs +++ b/validator_client/src/duties/beacon_node_duties.rs @@ -2,12 +2,12 @@ use super::EpochDuties; use types::{Epoch, PublicKey}; #[derive(Debug, PartialEq, Clone)] -pub enum BeaconNodeError { +pub enum BeaconNodeDutiesError { RemoteFailure(String), } /// Defines the methods required to obtain a validators shuffling from a Beacon Node. -pub trait BeaconNode: Send + Sync { +pub trait BeaconNodeDuties: Send + Sync { /// Gets the duties for all validators. /// /// Returns a vector of EpochDuties for each validator public key. The entry will be None for @@ -15,6 +15,6 @@ pub trait BeaconNode: Send + Sync { fn request_duties( &self, epoch: Epoch, - pubkeys: &[PublicKey], - ) -> Result; + pub_keys: &[PublicKey], + ) -> Result; } diff --git a/validator_client/src/duties/epoch_duties.rs b/validator_client/src/duties/epoch_duties.rs index 5c23e82b1e..692a8d6a62 100644 --- a/validator_client/src/duties/epoch_duties.rs +++ b/validator_client/src/duties/epoch_duties.rs @@ -1,4 +1,5 @@ use std::collections::HashMap; +use std::fmt; use std::ops::{Deref, DerefMut}; use types::{AttestationDuty, Epoch, PublicKey, Slot}; @@ -22,9 +23,7 @@ pub struct WorkInfo { #[derive(Debug, PartialEq, Clone, Copy, Default)] pub struct EpochDuty { pub block_production_slot: Option, - pub attestation_slot: Slot, - pub attestation_shard: u64, - pub committee_index: u64, + pub attestation_duty: AttestationDuty, } impl EpochDuty { @@ -38,12 +37,8 @@ impl EpochDuty { // if the validator is required to attest to a shard, create the data let mut attestation_duty = None; - if self.attestation_slot == slot { - attestation_duty = Some(AttestationDuty { - slot, - shard: self.attestation_shard, - committee_index: self.committee_index as usize, - }); + if self.attestation_duty.slot == slot { + attestation_duty = Some(self.attestation_duty) } if produce_block | attestation_duty.is_some() { @@ -55,11 +50,25 @@ impl EpochDuty { None } } -/// Maps a list of public keys (many validators) to an EpochDuty. + +impl fmt::Display for EpochDuty { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + let mut display_block = String::from("None"); + if let Some(block_slot) = self.block_production_slot { + display_block = block_slot.to_string(); + } + write!( + f, + "produce block slot: {}, attestation slot: {}, attestation shard: {}", + display_block, self.attestation_duty.slot, self.attestation_duty.shard + ) + } +} + +/// Maps a list of keypairs (many validators) to an EpochDuty. pub type EpochDuties = HashMap>; pub enum EpochDutiesMapError { - Poisoned, UnknownEpoch, UnknownValidator, } @@ -98,7 +107,7 @@ impl EpochDutiesMap { pub fn is_work_slot( &self, slot: Slot, - pubkey: &PublicKey, + signer: &PublicKey, ) -> Result, EpochDutiesMapError> { let epoch = slot.epoch(self.slots_per_epoch); @@ -106,7 +115,7 @@ impl EpochDutiesMap { .map .get(&epoch) .ok_or_else(|| EpochDutiesMapError::UnknownEpoch)?; - if let Some(epoch_duty) = epoch_duties.get(pubkey) { + if let Some(epoch_duty) = epoch_duties.get(signer) { if let Some(duty) = epoch_duty { // Retrieves the duty for a validator at a given slot return Ok(duty.is_work_slot(slot)); diff --git a/validator_client/src/duties/grpc.rs b/validator_client/src/duties/grpc.rs index 511ffa34a2..58fb5c992d 100644 --- a/validator_client/src/duties/grpc.rs +++ b/validator_client/src/duties/grpc.rs @@ -1,26 +1,27 @@ +use super::beacon_node_duties::{BeaconNodeDuties, BeaconNodeDutiesError}; use super::epoch_duties::{EpochDuties, EpochDuty}; -use super::traits::{BeaconNode, BeaconNodeError}; -use grpcio::CallOption; +// to use if we manually specify a timeout +//use grpcio::CallOption; use protos::services::{GetDutiesRequest, Validators}; use protos::services_grpc::ValidatorServiceClient; use ssz::ssz_encode; use std::collections::HashMap; -use std::time::Duration; -use types::{Epoch, PublicKey, Slot}; +// use std::time::Duration; +use types::{AttestationDuty, Epoch, PublicKey, Slot}; -impl BeaconNode for ValidatorServiceClient { +impl BeaconNodeDuties for ValidatorServiceClient { /// Requests all duties (block signing and committee attesting) from the Beacon Node (BN). fn request_duties( &self, epoch: Epoch, - pubkeys: &[PublicKey], - ) -> Result { + pub_keys: &[PublicKey], + ) -> Result { // Get the required duties from all validators // build the request let mut req = GetDutiesRequest::new(); req.set_epoch(epoch.as_u64()); let mut validators = Validators::new(); - validators.set_public_keys(pubkeys.iter().map(|v| ssz_encode(v)).collect()); + validators.set_public_keys(pub_keys.iter().map(|v| ssz_encode(v)).collect()); req.set_validators(validators); // set a timeout for requests @@ -29,13 +30,13 @@ impl BeaconNode for ValidatorServiceClient { // send the request, get the duties reply let reply = self .get_validator_duties(&req) - .map_err(|err| BeaconNodeError::RemoteFailure(format!("{:?}", err)))?; + .map_err(|err| BeaconNodeDutiesError::RemoteFailure(format!("{:?}", err)))?; let mut epoch_duties: HashMap> = HashMap::new(); for (index, validator_duty) in reply.get_active_validators().iter().enumerate() { if !validator_duty.has_duty() { // validator is inactive - epoch_duties.insert(pubkeys[index].clone(), None); + epoch_duties.insert(pub_keys[index].clone(), None); continue; } // active validator @@ -47,13 +48,19 @@ impl BeaconNode for ValidatorServiceClient { None } }; + + let attestation_duty = AttestationDuty { + slot: Slot::from(active_duty.get_attestation_slot()), + shard: active_duty.get_attestation_shard(), + committee_index: active_duty.get_committee_index() as usize, + committee_len: active_duty.get_committee_len() as usize, + }; + let epoch_duty = EpochDuty { block_production_slot, - attestation_slot: Slot::from(active_duty.get_attestation_slot()), - attestation_shard: active_duty.get_attestation_shard(), - committee_index: active_duty.get_committee_index(), + attestation_duty, }; - epoch_duties.insert(pubkeys[index].clone(), Some(epoch_duty)); + epoch_duties.insert(pub_keys[index].clone(), Some(epoch_duty)); } Ok(epoch_duties) } diff --git a/validator_client/src/duties/mod.rs b/validator_client/src/duties/mod.rs index 0e962053e1..7db4672e30 100644 --- a/validator_client/src/duties/mod.rs +++ b/validator_client/src/duties/mod.rs @@ -1,15 +1,17 @@ +mod beacon_node_duties; mod epoch_duties; mod grpc; // TODO: reintroduce tests //#[cfg(test)] //mod test_node; -mod traits; +pub use self::beacon_node_duties::{BeaconNodeDuties, BeaconNodeDutiesError}; use self::epoch_duties::{EpochDuties, EpochDutiesMapError}; pub use self::epoch_duties::{EpochDutiesMap, WorkInfo}; -use self::traits::{BeaconNode, BeaconNodeError}; +use super::signer::Signer; use futures::Async; use slog::{debug, error, info}; +use std::fmt::Display; use std::sync::Arc; use std::sync::RwLock; use types::{Epoch, PublicKey, Slot}; @@ -28,8 +30,7 @@ pub enum UpdateOutcome { #[derive(Debug, PartialEq)] pub enum Error { DutiesMapPoisoned, - EpochMapPoisoned, - BeaconNodeError(BeaconNodeError), + BeaconNodeDutiesError(BeaconNodeDutiesError), UnknownEpoch, UnknownValidator, } @@ -38,19 +39,20 @@ pub enum Error { /// Node. /// /// This keeps track of all validator keys and required voting slots. -pub struct DutiesManager { +pub struct DutiesManager { pub duties_map: RwLock, - /// A list of all public keys known to the validator service. - pub pubkeys: Vec, + /// A list of all signer objects known to the validator service. + pub signers: Arc>, pub beacon_node: Arc, } -impl DutiesManager { +impl DutiesManager { /// Check the Beacon Node for `EpochDuties`. /// /// be a wall-clock (e.g., system time, remote server time, etc.). fn update(&self, epoch: Epoch) -> Result { - let duties = self.beacon_node.request_duties(epoch, &self.pubkeys)?; + let public_keys: Vec = self.signers.iter().map(|s| s.to_public()).collect(); + let duties = self.beacon_node.request_duties(epoch, &public_keys)?; { // If these duties were known, check to see if they're updates or identical. if let Some(known_duties) = self.duties_map.read()?.get(&epoch) { @@ -67,7 +69,7 @@ impl DutiesManager { // duties have changed //TODO: Duties could be large here. Remove from display and avoid the clone. self.duties_map.write()?.insert(epoch, duties.clone()); - return Ok(UpdateOutcome::DutiesChanged(epoch, duties)); + Ok(UpdateOutcome::DutiesChanged(epoch, duties)) } /// A future wrapping around `update()`. This will perform logic based upon the update @@ -82,25 +84,27 @@ impl DutiesManager { info!(log, "Duties changed (potential re-org)"; "epoch" => epoch, "duties" => format!("{:?}", duties)) } Ok(UpdateOutcome::NewDuties(epoch, duties)) => { - info!(log, "New duties obtained"; "epoch" => epoch, "duties" => format!("{:?}", duties)) + info!(log, "New duties obtained"; "epoch" => epoch); + print_duties(&log, duties); } }; Ok(Async::Ready(())) } - /// Returns a list of (Public, WorkInfo) indicating all the validators that have work to perform + /// Returns a list of (index, WorkInfo) indicating all the validators that have work to perform /// this slot. - pub fn get_current_work(&self, slot: Slot) -> Option> { - let mut current_work: Vec<(PublicKey, WorkInfo)> = Vec::new(); + pub fn get_current_work(&self, slot: Slot) -> Option> { + let mut current_work: Vec<(usize, WorkInfo)> = Vec::new(); // if the map is poisoned, return None let duties = self.duties_map.read().ok()?; - for validator_pk in &self.pubkeys { - match duties.is_work_slot(slot, &validator_pk) { - Ok(Some(work_type)) => current_work.push((validator_pk.clone(), work_type)), + for (index, validator_signer) in self.signers.iter().enumerate() { + match duties.is_work_slot(slot, &validator_signer.to_public()) { + Ok(Some(work_type)) => current_work.push((index, work_type)), Ok(None) => {} // No work for this validator - Err(_) => {} // Unknown epoch or validator, no work + //TODO: This should really log an error, as we shouldn't end up with an err here. + Err(_) => {} // Unknown epoch or validator, no work } } if current_work.is_empty() { @@ -111,9 +115,9 @@ impl DutiesManager { } //TODO: Use error_chain to handle errors -impl From for Error { - fn from(e: BeaconNodeError) -> Error { - Error::BeaconNodeError(e) +impl From for Error { + fn from(e: BeaconNodeDutiesError) -> Error { + Error::BeaconNodeDutiesError(e) } } @@ -126,13 +130,22 @@ impl From> for Error { impl From for Error { fn from(e: EpochDutiesMapError) -> Error { match e { - EpochDutiesMapError::Poisoned => Error::EpochMapPoisoned, EpochDutiesMapError::UnknownEpoch => Error::UnknownEpoch, EpochDutiesMapError::UnknownValidator => Error::UnknownValidator, } } } +fn print_duties(log: &slog::Logger, duties: EpochDuties) { + for (pk, duty) in duties.iter() { + if let Some(display_duty) = duty { + info!(log, "Validator: {}",pk; "Duty" => format!("{}",display_duty)); + } else { + info!(log, "Validator: {}",pk; "Duty" => "None"); + } + } +} + /* TODO: Modify tests for new Duties Manager form #[cfg(test)] mod tests { diff --git a/validator_client/src/main.rs b/validator_client/src/main.rs index d044030fe6..7a353e0dcc 100644 --- a/validator_client/src/main.rs +++ b/validator_client/src/main.rs @@ -1,14 +1,17 @@ -mod attester_service; -mod block_producer_service; +mod attestation_producer; +mod block_producer; mod config; mod duties; pub mod error; mod service; +mod signer; use crate::config::Config as ValidatorClientConfig; use clap::{App, Arg}; +use protos::services_grpc::ValidatorServiceClient; use service::Service as ValidatorService; use slog::{error, info, o, Drain}; +use types::Keypair; fn main() { // Logging @@ -52,7 +55,8 @@ fn main() { .expect("Unable to build a configuration for the validator client."); // start the validator service. - match ValidatorService::start(config, log.clone()) { + // this specifies the GRPC and signer type to use as the duty manager beacon node. + match ValidatorService::::start(config, log.clone()) { Ok(_) => info!(log, "Validator client shutdown successfully."), Err(e) => error!(log, "Validator exited due to: {}", e.to_string()), } diff --git a/validator_client/src/service.rs b/validator_client/src/service.rs index 83e7608550..ce19c23e93 100644 --- a/validator_client/src/service.rs +++ b/validator_client/src/service.rs @@ -1,14 +1,20 @@ -/// The validator service. Connects to a beacon node and signs blocks when required. -use crate::attester_service::{AttestationGrpcClient, AttesterService}; -use crate::block_producer_service::{BeaconBlockGrpcClient, BlockProducerService}; +/// The Validator Client service. +/// +/// Connects to a beacon node and negotiates the correct chain id. +/// +/// Once connected, the service loads known validators keypairs from disk. Every slot, +/// the service pings the beacon node, asking for new duties for each of the validators. +/// +/// When a validator needs to either produce a block or sign an attestation, it requests the +/// data from the beacon node and performs the signing before publishing the block to the beacon +/// node. +use crate::attestation_producer::AttestationProducer; +use crate::block_producer::{BeaconBlockGrpcClient, BlockProducer}; use crate::config::Config as ValidatorConfig; -use crate::duties::UpdateOutcome; -use crate::duties::{DutiesManager, EpochDutiesMap}; +use crate::duties::{BeaconNodeDuties, DutiesManager, EpochDutiesMap}; use crate::error as error_chain; use crate::error::ErrorKind; -use attester::test_utils::EpochMap; -use attester::{test_utils::LocalSigner as AttesterLocalSigner, Attester}; -use block_proposer::{test_utils::LocalSigner as BlockProposerLocalSigner, BlockProducer}; +use crate::signer::Signer; use bls::Keypair; use grpcio::{ChannelBuilder, EnvBuilder}; use protos::services::Empty; @@ -16,65 +22,64 @@ use protos::services_grpc::{ AttestationServiceClient, BeaconBlockServiceClient, BeaconNodeServiceClient, ValidatorServiceClient, }; -use slog::{debug, error, info, warn}; +use slog::{error, info, warn}; use slot_clock::{SlotClock, SystemTimeSlotClock}; use std::sync::Arc; use std::sync::RwLock; use std::time::{Duration, Instant, SystemTime}; use tokio::prelude::*; use tokio::runtime::Builder; -use tokio::timer::Interval; +use tokio::timer::{Delay, Interval}; use tokio_timer::clock::Clock; use types::test_utils::generate_deterministic_keypairs; -use types::{Epoch, Fork, Slot}; +use types::{ChainSpec, Epoch, Fork, Slot}; -//TODO: This service should be simplified in the future. Can be made more steamlined. +/// A fixed amount of time after a slot to perform operations. This gives the node time to complete +/// per-slot processes. +const TIME_DELAY_FROM_SLOT: Duration = Duration::from_millis(200); /// The validator service. This is the main thread that executes and maintains validator /// duties. -pub struct Service { - /// The node we currently connected to. - connected_node_version: String, - /// The chain id we are processing on. - chain_id: u16, - /// The fork state we processing on. +//TODO: Generalize the BeaconNode types to use testing +pub struct Service { + /// The node's current fork version we are processing on. fork: Fork, /// The slot clock for this service. slot_clock: SystemTimeSlotClock, /// The current slot we are processing. current_slot: Slot, - /// The number of slots per epoch to allow for converting slots to epochs. - slots_per_epoch: u64, + /// The chain specification for this clients instance. + spec: Arc, + /// The duties manager which maintains the state of when to perform actions. + duties_manager: Arc>, // GRPC Clients /// The beacon block GRPC client. - beacon_block_client: Arc, - /// The validator GRPC client. - validator_client: Arc, + beacon_block_client: Arc, /// The attester GRPC client. - attester_client: Arc, + attestation_client: Arc, /// The validator client logger. log: slog::Logger, } -impl Service { +impl Service { /// Initial connection to the beacon node to determine its properties. /// /// This tries to connect to a beacon node. Once connected, it initialised the gRPC clients /// and returns an instance of the service. fn initialize_service( - config: &ValidatorConfig, + config: ValidatorConfig, log: slog::Logger, - ) -> error_chain::Result { + ) -> error_chain::Result> { // initialise the beacon node client to check for a connection let env = Arc::new(EnvBuilder::new().build()); // Beacon node gRPC beacon node endpoints. let beacon_node_client = { let ch = ChannelBuilder::new(env.clone()).connect(&config.server); - Arc::new(BeaconNodeServiceClient::new(ch)) + BeaconNodeServiceClient::new(ch) }; - // retrieve node information + // retrieve node information and validate the beacon node let node_info = loop { match beacon_node_client.info(&Empty::new()) { Err(e) => { @@ -84,18 +89,27 @@ impl Service { continue; } Ok(info) => { + // verify the node's genesis time if SystemTime::now() .duration_since(SystemTime::UNIX_EPOCH) .unwrap() .as_secs() < info.genesis_time { - warn!( + error!( log, "Beacon Node's genesis time is in the future. No work to do.\n Exiting" ); return Err("Genesis time in the future".into()); } + // verify the node's chain id + if config.spec.chain_id != info.chain_id as u8 { + error!( + log, + "Beacon Node's genesis time is in the future. No work to do.\n Exiting" + ); + return Err(format!("Beacon node has the wrong chain id. Expected chain id: {}, node's chain id: {}", config.spec.chain_id, info.chain_id).into()); + } break info; } }; @@ -123,7 +137,9 @@ impl Service { // Beacon node gRPC beacon block endpoints. let beacon_block_client = { let ch = ChannelBuilder::new(env.clone()).connect(&config.server); - Arc::new(BeaconBlockServiceClient::new(ch)) + let beacon_block_service_client = Arc::new(BeaconBlockServiceClient::new(ch)); + // a wrapper around the service client to implement the beacon block node trait + Arc::new(BeaconBlockGrpcClient::new(beacon_block_service_client)) }; // Beacon node gRPC validator endpoints. @@ -133,7 +149,7 @@ impl Service { }; //Beacon node gRPC attester endpoints. - let attester_client = { + let attestation_client = { let ch = ChannelBuilder::new(env.clone()).connect(&config.server); Arc::new(AttestationServiceClient::new(ch)) }; @@ -145,27 +161,51 @@ impl Service { let current_slot = slot_clock .present_slot() - .map_err(|e| ErrorKind::SlotClockError(e))? + .map_err(ErrorKind::SlotClockError)? .expect("Genesis must be in the future"); - Ok(Self { - connected_node_version: node_info.version, - chain_id: node_info.chain_id as u16, + /* Generate the duties manager */ + + // generate keypairs + + // TODO: keypairs are randomly generated; they should be loaded from a file or generated. + // https://github.com/sigp/lighthouse/issues/160 + let keypairs = Arc::new(generate_deterministic_keypairs(8)); + + // Builds a mapping of Epoch -> Map(PublicKey, EpochDuty) + // where EpochDuty contains slot numbers and attestation data that each validator needs to + // produce work on. + let duties_map = RwLock::new(EpochDutiesMap::new(config.spec.slots_per_epoch)); + + // builds a manager which maintains the list of current duties for all known validators + // and can check when a validator needs to perform a task. + let duties_manager = Arc::new(DutiesManager { + duties_map, + // these are abstract objects capable of signing + signers: keypairs, + beacon_node: validator_client, + }); + + let spec = Arc::new(config.spec); + + Ok(Service { fork, slot_clock, current_slot, - slots_per_epoch: config.spec.slots_per_epoch, + spec, + duties_manager, beacon_block_client, - validator_client, - attester_client, + attestation_client, log, }) } /// Initialise the service then run the core thread. + // TODO: Improve handling of generic BeaconNode types, to stub grpcClient pub fn start(config: ValidatorConfig, log: slog::Logger) -> error_chain::Result<()> { // connect to the node and retrieve its properties and initialize the gRPC clients - let service = Service::initialize_service(&config, log)?; + let mut service = + Service::::initialize_service(config, log)?; // we have connected to a node and established its parameters. Spin up the core service @@ -185,144 +225,130 @@ impl Service { // set up the validator work interval - start at next slot and proceed every slot let interval = { // Set the interval to start at the next slot, and every slot after - let slot_duration = Duration::from_secs(config.spec.seconds_per_slot); + let slot_duration = Duration::from_secs(service.spec.seconds_per_slot); //TODO: Handle checked add correctly Interval::new(Instant::now() + duration_to_next_slot, slot_duration) }; - /* kick off core service */ - - // generate keypairs - - // TODO: keypairs are randomly generated; they should be loaded from a file or generated. - // https://github.com/sigp/lighthouse/issues/160 - let keypairs = Arc::new(generate_deterministic_keypairs(8)); - - /* build requisite objects to pass to core thread */ - - // Builds a mapping of Epoch -> Map(PublicKey, EpochDuty) - // where EpochDuty contains slot numbers and attestation data that each validator needs to - // produce work on. - let duties_map = RwLock::new(EpochDutiesMap::new(config.spec.slots_per_epoch)); - - // builds a manager which maintains the list of current duties for all known validators - // and can check when a validator needs to perform a task. - let manager = Arc::new(DutiesManager { - duties_map, - pubkeys: keypairs.iter().map(|keypair| keypair.pk.clone()).collect(), - beacon_node: service.validator_client.clone(), - }); - - // run the core thread + /* kick off the core service */ runtime.block_on( interval .for_each(move |_| { - let log = service.log.clone(); - - /* get the current slot and epoch */ - let current_slot = match service.slot_clock.present_slot() { - Err(e) => { - error!(log, "SystemTimeError {:?}", e); - return Ok(()); - } - Ok(slot) => slot.expect("Genesis is in the future"), - }; - - let current_epoch = current_slot.epoch(service.slots_per_epoch); - - debug_assert!( - current_slot > service.current_slot, - "The Timer should poll a new slot" - ); - - info!(log, "Processing slot: {}", current_slot.as_u64()); - - /* check for new duties */ - - let cloned_manager = manager.clone(); - let cloned_log = log.clone(); - // spawn a new thread separate to the runtime - std::thread::spawn(move || { - cloned_manager.run_update(current_epoch.clone(), cloned_log.clone()); - dbg!("Finished thread"); - }); - - /* execute any specified duties */ - - if let Some(work) = manager.get_current_work(current_slot) { - for (_public_key, work_type) in work { - if work_type.produce_block { - // TODO: Produce a beacon block in a new thread - } - if work_type.attestation_duty.is_some() { - // available AttestationDuty info - let attestation_duty = - work_type.attestation_duty.expect("Cannot be None"); - //TODO: Produce an attestation in a new thread - } - } - } - + // wait for node to process + std::thread::sleep(TIME_DELAY_FROM_SLOT); + // if a non-fatal error occurs, proceed to the next slot. + let _ignore_error = service.per_slot_execution(); + // completed a slot process Ok(()) }) .map_err(|e| format!("Service thread failed: {:?}", e)), - ); - - // completed a slot process + )?; + // validator client exited Ok(()) } - /* + /// The execution logic that runs every slot. + // Errors are logged to output, and core execution continues unless fatal errors occur. + fn per_slot_execution(&mut self) -> error_chain::Result<()> { + /* get the new current slot and epoch */ + self.update_current_slot()?; - // Spawn a new thread to perform block production for the validator. - let producer_thread = { - let spec = spec.clone(); - let signer = Arc::new(BlockProposerLocalSigner::new(keypair.clone())); - let duties_map = duties_map.clone(); - let slot_clock = slot_clock.clone(); - let log = log.clone(); - let client = Arc::new(BeaconBlockGrpcClient::new(beacon_block_grpc_client.clone())); - thread::spawn(move || { - let block_producer = - BlockProducer::new(spec, duties_map, slot_clock, client, signer); - let mut block_producer_service = BlockProducerService { - block_producer, - poll_interval_millis, - log, - }; + /* check for new duties */ + self.check_for_duties(); - block_producer_service.run(); - }) - }; + /* process any required duties for validators */ + self.process_duties(); - // Spawn a new thread for attestation for the validator. - let attester_thread = { - let signer = Arc::new(AttesterLocalSigner::new(keypair.clone())); - let epoch_map = epoch_map_for_attester.clone(); - let slot_clock = slot_clock.clone(); - let log = log.clone(); - let client = Arc::new(AttestationGrpcClient::new(attester_grpc_client.clone())); - thread::spawn(move || { - let attester = Attester::new(epoch_map, slot_clock, client, signer); - let mut attester_service = AttesterService { - attester, - poll_interval_millis, - log, - }; - - attester_service.run(); - }) - }; - - threads.push((duties_manager_thread, producer_thread, attester_thread)); + Ok(()) } - // Naively wait for all the threads to complete. - for tuple in threads { - let (manager, producer, attester) = tuple; - let _ = producer.join(); - let _ = manager.join(); - let _ = attester.join(); + /// Updates the known current slot and epoch. + fn update_current_slot(&mut self) -> error_chain::Result<()> { + let current_slot = match self.slot_clock.present_slot() { + Err(e) => { + error!(self.log, "SystemTimeError {:?}", e); + return Err("Could not read system time".into()); + } + Ok(slot) => slot.expect("Genesis is in the future"), + }; + + let current_epoch = current_slot.epoch(self.spec.slots_per_epoch); + + // this is a fatal error. If the slot clock repeats, there is something wrong with + // the timer, terminate immediately. + assert!( + current_slot > self.current_slot, + "The Timer should poll a new slot" + ); + self.current_slot = current_slot; + info!(self.log, "Processing"; "slot" => current_slot.as_u64(), "epoch" => current_epoch.as_u64()); + Ok(()) + } + + /// For all known validator keypairs, update any known duties from the beacon node. + fn check_for_duties(&mut self) { + let cloned_manager = self.duties_manager.clone(); + let cloned_log = self.log.clone(); + let current_epoch = self.current_slot.epoch(self.spec.slots_per_epoch); + // spawn a new thread separate to the runtime + // TODO: Handle thread termination/timeout + // TODO: Add duties thread back in, with channel to process duties in duty change. + // leave sequential for now. + //std::thread::spawn(move || { + // the return value is a future which returns ready. + // built to be compatible with the tokio runtime. + let _empty = cloned_manager.run_update(current_epoch, cloned_log.clone()); + //}); + } + + /// If there are any duties to process, spawn a separate thread and perform required actions. + fn process_duties(&mut self) { + if let Some(work) = self.duties_manager.get_current_work(self.current_slot) { + for (signer_index, work_type) in work { + if work_type.produce_block { + // we need to produce a block + // spawns a thread to produce a beacon block + let signers = self.duties_manager.signers.clone(); // this is an arc + let fork = self.fork.clone(); + let slot = self.current_slot; + let spec = self.spec.clone(); + let beacon_node = self.beacon_block_client.clone(); + let log = self.log.clone(); + std::thread::spawn(move || { + info!(log, "Producing a block"; "Validator"=> format!("{}", signers[signer_index])); + let signer = &signers[signer_index]; + let mut block_producer = BlockProducer { + fork, + slot, + spec, + beacon_node, + signer, + }; + block_producer.handle_produce_block(log); + }); + } + if work_type.attestation_duty.is_some() { + // we need to produce an attestation + // spawns a thread to produce and sign an attestation + let signers = self.duties_manager.signers.clone(); // this is an arc + let fork = self.fork.clone(); + let spec = self.spec.clone(); + let beacon_node = self.attestation_client.clone(); + let log = self.log.clone(); + std::thread::spawn(move || { + info!(log, "Producing an attestation"; "Validator"=> format!("{}", signers[signer_index])); + let signer = &signers[signer_index]; + let mut attestation_producer = AttestationProducer { + fork, + duty: work_type.attestation_duty.expect("Should never be none"), + spec, + beacon_node, + signer, + }; + attestation_producer.handle_produce_attestation(log); + }); + } + } + } } - */ } diff --git a/validator_client/src/signer.rs b/validator_client/src/signer.rs new file mode 100644 index 0000000000..018142322f --- /dev/null +++ b/validator_client/src/signer.rs @@ -0,0 +1,21 @@ +use std::fmt::Display; +use types::{Keypair, PublicKey, Signature}; + +/// Signs message using an internally-maintained private key. +pub trait Signer: Display + Send + Sync + Clone { + fn sign_message(&self, message: &[u8], domain: u64) -> Option; + /// Returns a public key for the signer object. + fn to_public(&self) -> PublicKey; +} + +/* Implements Display and Signer for Keypair */ + +impl Signer for Keypair { + fn to_public(&self) -> PublicKey { + self.pk.clone() + } + + fn sign_message(&self, message: &[u8], domain: u64) -> Option { + Some(Signature::new(message, domain, &self.sk)) + } +}