diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 3cdcb7f9aa..fae8942aa9 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -454,7 +454,7 @@ impl BeaconChain { /// Returns the validator index (if any) for the given public key. /// /// Information is retrieved from the present `beacon_state.validators`. - pub fn validator_index(&self, pubkey: &PublicKey) -> Option { + pub fn validator_index(&self, pubkey: &PublicKeyBytes) -> Option { for (i, validator) in self.head().beacon_state.validators.iter().enumerate() { if validator.pubkey == *pubkey { return Some(i); diff --git a/beacon_node/rest_api/src/helpers.rs b/beacon_node/rest_api/src/helpers.rs index 55d9025a3c..811c898057 100644 --- a/beacon_node/rest_api/src/helpers.rs +++ b/beacon_node/rest_api/src/helpers.rs @@ -1,6 +1,6 @@ use crate::{ApiError, ApiResult}; use beacon_chain::{BeaconChain, BeaconChainTypes}; -use bls::PublicKey; +use bls::PublicKeyBytes; use eth2_libp2p::{PubsubMessage, Topic}; use eth2_libp2p::{ BEACON_ATTESTATION_TOPIC, BEACON_BLOCK_TOPIC, TOPIC_ENCODING_POSTFIX, TOPIC_PREFIX, @@ -99,12 +99,12 @@ pub fn parse_root(string: &str) -> Result { } /// Parse a PublicKey from a `0x` prefixed hex string -pub fn parse_pubkey(string: &str) -> Result { +pub fn parse_pubkey_bytes(string: &str) -> Result { const PREFIX: &str = "0x"; if string.starts_with(PREFIX) { let pubkey_bytes = hex::decode(string.trim_start_matches(PREFIX)) .map_err(|e| ApiError::BadRequest(format!("Invalid hex string: {:?}", e)))?; - let pubkey = PublicKey::from_bytes(pubkey_bytes.as_slice()).map_err(|e| { + let pubkey = PublicKeyBytes::from_bytes(pubkey_bytes.as_slice()).map_err(|e| { ApiError::BadRequest(format!("Unable to deserialize public key: {:?}.", e)) })?; Ok(pubkey) diff --git a/beacon_node/rest_api/src/lib.rs b/beacon_node/rest_api/src/lib.rs index 58493d0322..54e805e28a 100644 --- a/beacon_node/rest_api/src/lib.rs +++ b/beacon_node/rest_api/src/lib.rs @@ -37,7 +37,7 @@ use tokio::runtime::TaskExecutor; use tokio::sync::mpsc; use url_query::UrlQuery; -pub use crate::helpers::parse_pubkey; +pub use crate::helpers::parse_pubkey_bytes; pub use beacon::{BlockResponse, HeadResponse, StateResponse}; pub use config::Config; pub use validator::{BulkValidatorDutiesRequest, ValidatorDuty}; diff --git a/beacon_node/rest_api/src/validator.rs b/beacon_node/rest_api/src/validator.rs index b9a8f9c4e9..5b8c241b6c 100644 --- a/beacon_node/rest_api/src/validator.rs +++ b/beacon_node/rest_api/src/validator.rs @@ -1,5 +1,5 @@ use crate::helpers::{ - check_content_type_for_json, parse_pubkey, publish_attestation_to_network, + check_content_type_for_json, parse_pubkey_bytes, publish_attestation_to_network, publish_beacon_block_to_network, }; use crate::response_builder::ResponseBuilder; @@ -7,7 +7,7 @@ use crate::{ApiError, ApiResult, BoxFut, NetworkChannel, UrlQuery}; use beacon_chain::{ AttestationProcessingOutcome, BeaconChain, BeaconChainTypes, BlockProcessingOutcome, }; -use bls::PublicKey; +use bls::PublicKeyBytes; use futures::future::Future; use futures::stream::Stream; use hyper::{Body, Request}; @@ -21,7 +21,7 @@ use types::{Attestation, BeaconBlock, CommitteeIndex, Epoch, RelativeEpoch, Slot #[derive(PartialEq, Debug, Serialize, Deserialize, Clone)] pub struct ValidatorDuty { /// The validator's BLS public key, uniquely identifying them. _48-bytes, hex encoded with 0x prefix, case insensitive._ - pub validator_pubkey: PublicKey, + pub validator_pubkey: PublicKeyBytes, /// The slot at which the validator must attest. pub attestation_slot: Option, /// The index of the committee within `slot` of which the validator is a member. @@ -35,7 +35,7 @@ pub struct ValidatorDuty { #[derive(PartialEq, Debug, Serialize, Deserialize, Clone, Encode, Decode)] pub struct BulkValidatorDutiesRequest { pub epoch: Epoch, - pub pubkeys: Vec, + pub pubkeys: Vec, } /// HTTP Handler to retrieve a the duties for a set of validators during a particular epoch. This @@ -60,7 +60,11 @@ pub fn post_validator_duties( }) }) .and_then(|bulk_request| { - return_validator_duties(beacon_chain, bulk_request.epoch, bulk_request.pubkeys) + return_validator_duties( + beacon_chain, + bulk_request.epoch, + bulk_request.pubkeys.into_iter().map(Into::into).collect(), + ) }) .and_then(|duties| response_builder?.body_no_ssz(&duties)); @@ -80,7 +84,7 @@ pub fn get_validator_duties( let validator_pubkeys = query .all_of("validator_pubkeys")? .iter() - .map(|validator_pubkey_str| parse_pubkey(validator_pubkey_str)) + .map(|validator_pubkey_str| parse_pubkey_bytes(validator_pubkey_str)) .collect::>()?; let duties = return_validator_duties(beacon_chain, epoch, validator_pubkeys)?; @@ -91,7 +95,7 @@ pub fn get_validator_duties( fn return_validator_duties( beacon_chain: Arc>, epoch: Epoch, - validator_pubkeys: Vec, + validator_pubkeys: Vec, ) -> Result, ApiError> { let slots_per_epoch = T::EthSpec::slots_per_epoch(); let head_epoch = beacon_chain.head().beacon_state.current_epoch(); diff --git a/beacon_node/rest_api/tests/test.rs b/beacon_node/rest_api/tests/test.rs index a810d5acd6..61d3bbfa39 100644 --- a/beacon_node/rest_api/tests/test.rs +++ b/beacon_node/rest_api/tests/test.rs @@ -6,6 +6,7 @@ use node_test_rig::{ testing_client_config, ClientConfig, ClientGenesis, LocalBeaconNode, }; use remote_beacon_node::{PublishStatus, ValidatorDuty}; +use std::convert::TryInto; use std::sync::Arc; use tree_hash::TreeHash; use types::{ @@ -182,7 +183,7 @@ fn validator_duties_bulk() { .beacon_state .validators .iter() - .map(|v| v.pubkey.clone()) + .map(|v| (&v.pubkey).try_into().expect("pubkey should be valid")) .collect::>(); let duties = env @@ -219,7 +220,7 @@ fn validator_duties() { .beacon_state .validators .iter() - .map(|v| v.pubkey.clone()) + .map(|v| (&v.pubkey).try_into().expect("pubkey should be valid")) .collect::>(); let duties = env @@ -270,10 +271,16 @@ fn check_duties( .iter() .zip(duties.iter()) .for_each(|(validator, duty)| { - assert_eq!(*validator, duty.validator_pubkey, "pubkey should match"); + assert_eq!( + *validator, + (&duty.validator_pubkey) + .try_into() + .expect("should be valid pubkey"), + "pubkey should match" + ); let validator_index = state - .get_validator_index(validator) + .get_validator_index(&validator.clone().into()) .expect("should have pubkey cache") .expect("pubkey should exist"); diff --git a/beacon_node/store/Cargo.toml b/beacon_node/store/Cargo.toml index adce8c6f70..bc4b684c0e 100644 --- a/beacon_node/store/Cargo.toml +++ b/beacon_node/store/Cargo.toml @@ -4,9 +4,15 @@ version = "0.1.0" authors = ["Paul Hauner "] edition = "2018" +[[bench]] +name = "benches" +harness = false + [dev-dependencies] tempfile = "3.1.0" sloggers = "0.3.2" +criterion = "0.3.0" +rayon = "1.2.0" [dependencies] db-key = "0.0.5" diff --git a/beacon_node/store/benches/benches.rs b/beacon_node/store/benches/benches.rs new file mode 100644 index 0000000000..5a8675ecff --- /dev/null +++ b/beacon_node/store/benches/benches.rs @@ -0,0 +1,114 @@ +use criterion::Criterion; +use criterion::{black_box, criterion_group, criterion_main, Benchmark}; +use rayon::prelude::*; +use ssz::{Decode, Encode}; +use std::convert::TryInto; +use store::BeaconStateStorageContainer; +use types::{ + test_utils::generate_deterministic_keypair, BeaconState, Epoch, Eth1Data, EthSpec, Hash256, + MainnetEthSpec, Validator, +}; + +fn get_state(validator_count: usize) -> BeaconState { + let spec = &E::default_spec(); + let eth1_data = Eth1Data { + deposit_root: Hash256::zero(), + deposit_count: 0, + block_hash: Hash256::zero(), + }; + + let mut state = BeaconState::new(0, eth1_data, spec); + + for i in 0..validator_count { + state.balances.push(i as u64).expect("should add balance"); + } + + state.validators = (0..validator_count) + .into_iter() + .collect::>() + .par_iter() + .map(|&i| Validator { + pubkey: generate_deterministic_keypair(i).pk.into(), + withdrawal_credentials: Hash256::from_low_u64_le(i as u64), + effective_balance: spec.max_effective_balance, + slashed: false, + activation_eligibility_epoch: Epoch::new(0), + activation_epoch: Epoch::new(0), + exit_epoch: Epoch::from(u64::max_value()), + withdrawable_epoch: Epoch::from(u64::max_value()), + }) + .collect::>() + .into(); + + state.build_all_caches(spec).expect("should build caches"); + + state +} + +fn all_benches(c: &mut Criterion) { + let validator_count = 16_384; + let state = get_state::(validator_count); + let storage_container = BeaconStateStorageContainer::new(&state); + let state_bytes = storage_container.as_ssz_bytes(); + + let inner_state = state.clone(); + c.bench( + &format!("{}_validators", validator_count), + Benchmark::new("encode/beacon_state", move |b| { + b.iter_batched_ref( + || inner_state.clone(), + |state| black_box(BeaconStateStorageContainer::new(state).as_ssz_bytes()), + criterion::BatchSize::SmallInput, + ) + }) + .sample_size(10), + ); + + let inner_state = state.clone(); + c.bench( + &format!("{}_validators", validator_count), + Benchmark::new("encode/beacon_state/tree_hash_cache", move |b| { + b.iter_batched_ref( + || inner_state.tree_hash_cache.clone(), + |tree_hash_cache| black_box(tree_hash_cache.as_ssz_bytes()), + criterion::BatchSize::SmallInput, + ) + }) + .sample_size(10), + ); + + let inner_state = state.clone(); + c.bench( + &format!("{}_validators", validator_count), + Benchmark::new("encode/beacon_state/committee_cache[0]", move |b| { + b.iter_batched_ref( + || inner_state.committee_caches[0].clone(), + |committee_cache| black_box(committee_cache.as_ssz_bytes()), + criterion::BatchSize::SmallInput, + ) + }) + .sample_size(10), + ); + + c.bench( + &format!("{}_validators", validator_count), + Benchmark::new("decode/beacon_state", move |b| { + b.iter_batched_ref( + || state_bytes.clone(), + |bytes| { + let state: BeaconState = + BeaconStateStorageContainer::from_ssz_bytes(&bytes) + .expect("should decode") + .try_into() + .expect("should convert into state"); + black_box(state) + }, + criterion::BatchSize::SmallInput, + ) + }) + .sample_size(10), + ); +} + +criterion_group!(benches, all_benches,); +criterion_main!(benches); diff --git a/beacon_node/store/examples/ssz_encode_state.rs b/beacon_node/store/examples/ssz_encode_state.rs new file mode 100644 index 0000000000..f36f85d063 --- /dev/null +++ b/beacon_node/store/examples/ssz_encode_state.rs @@ -0,0 +1,63 @@ +//! These examples only really exist so we can use them for flamegraph. If they get annoying to +//! maintain, feel free to delete. + +use rayon::prelude::*; +use ssz::{Decode, Encode}; +use std::convert::TryInto; +use store::BeaconStateStorageContainer; +use types::{ + test_utils::generate_deterministic_keypair, BeaconState, Epoch, Eth1Data, EthSpec, Hash256, + MainnetEthSpec, Validator, +}; + +type E = MainnetEthSpec; + +fn get_state(validator_count: usize) -> BeaconState { + let spec = &E::default_spec(); + let eth1_data = Eth1Data { + deposit_root: Hash256::zero(), + deposit_count: 0, + block_hash: Hash256::zero(), + }; + + let mut state = BeaconState::new(0, eth1_data, spec); + + for i in 0..validator_count { + state.balances.push(i as u64).expect("should add balance"); + } + + state.validators = (0..validator_count) + .into_iter() + .collect::>() + .par_iter() + .map(|&i| Validator { + pubkey: generate_deterministic_keypair(i).pk.into(), + withdrawal_credentials: Hash256::from_low_u64_le(i as u64), + effective_balance: spec.max_effective_balance, + slashed: false, + activation_eligibility_epoch: Epoch::new(0), + activation_epoch: Epoch::new(0), + exit_epoch: Epoch::from(u64::max_value()), + withdrawable_epoch: Epoch::from(u64::max_value()), + }) + .collect::>() + .into(); + + state.build_all_caches(spec).expect("should build caches"); + + state +} + +fn main() { + let validator_count = 1_024; + let state = get_state::(validator_count); + let storage_container = BeaconStateStorageContainer::new(&state); + + for _ in 0..1024 { + let container_bytes = storage_container.as_ssz_bytes(); + let _: BeaconState = BeaconStateStorageContainer::from_ssz_bytes(&container_bytes) + .expect("should decode") + .try_into() + .expect("should convert into state"); + } +} diff --git a/beacon_node/store/src/impls/beacon_state.rs b/beacon_node/store/src/impls/beacon_state.rs index 08ccec91cb..885abb593f 100644 --- a/beacon_node/store/src/impls/beacon_state.rs +++ b/beacon_node/store/src/impls/beacon_state.rs @@ -44,48 +44,51 @@ pub fn get_full_state( /// A container for storing `BeaconState` components. // TODO: would be more space efficient with the caches stored separately and referenced by hash #[derive(Encode, Decode)] -struct StorageContainer { - state_bytes: Vec, - committee_caches_bytes: Vec>, - tree_hash_cache_bytes: Vec, +pub struct StorageContainer { + state: BeaconState, + committee_caches: Vec, + tree_hash_cache: BeaconTreeHashCache, } -impl StorageContainer { +impl StorageContainer { /// Create a new instance for storing a `BeaconState`. - pub fn new(state: &BeaconState) -> Self { - let mut committee_caches_bytes = vec![]; + pub fn new(state: &BeaconState) -> Self { + let mut state = state.clone(); - for cache in state.committee_caches[..].iter() { - committee_caches_bytes.push(cache.as_ssz_bytes()); + let mut committee_caches = vec![CommitteeCache::default(); CACHED_EPOCHS]; + + for i in 0..CACHED_EPOCHS { + std::mem::swap(&mut state.committee_caches[i], &mut committee_caches[i]); } - let tree_hash_cache_bytes = state.tree_hash_cache.as_ssz_bytes(); + let tree_hash_cache = + std::mem::replace(&mut state.tree_hash_cache, BeaconTreeHashCache::default()); Self { - state_bytes: state.as_ssz_bytes(), - committee_caches_bytes, - tree_hash_cache_bytes, + state, + committee_caches, + tree_hash_cache, } } } -impl TryInto> for StorageContainer { +impl TryInto> for StorageContainer { type Error = Error; - fn try_into(self) -> Result, Error> { - let mut state: BeaconState = BeaconState::from_ssz_bytes(&self.state_bytes)?; + fn try_into(mut self) -> Result, Error> { + let mut state = self.state; - for i in 0..CACHED_EPOCHS { - let bytes = &self.committee_caches_bytes.get(i).ok_or_else(|| { - Error::SszDecodeError(DecodeError::BytesInvalid( + for i in (0..CACHED_EPOCHS).rev() { + if i >= self.committee_caches.len() { + return Err(Error::SszDecodeError(DecodeError::BytesInvalid( "Insufficient committees for BeaconState".to_string(), - )) - })?; + ))); + }; - state.committee_caches[i] = CommitteeCache::from_ssz_bytes(bytes)?; + state.committee_caches[i] = self.committee_caches.remove(i); } - state.tree_hash_cache = BeaconTreeHashCache::from_ssz_bytes(&self.tree_hash_cache_bytes)?; + state.tree_hash_cache = self.tree_hash_cache; Ok(state) } diff --git a/beacon_node/store/src/lib.rs b/beacon_node/store/src/lib.rs index 3698d2409f..786d2cac42 100644 --- a/beacon_node/store/src/lib.rs +++ b/beacon_node/store/src/lib.rs @@ -33,6 +33,7 @@ pub use self::memory_store::MemoryStore; pub use self::migrate::Migrate; pub use self::partial_beacon_state::PartialBeaconState; pub use errors::Error; +pub use impls::beacon_state::StorageContainer as BeaconStateStorageContainer; pub use metrics::scrape_for_metrics; pub use types::*; diff --git a/eth2/state_processing/src/per_block_processing.rs b/eth2/state_processing/src/per_block_processing.rs index e94ef413da..f6351e6715 100644 --- a/eth2/state_processing/src/per_block_processing.rs +++ b/eth2/state_processing/src/per_block_processing.rs @@ -440,7 +440,7 @@ pub fn process_deposit( }; // Get an `Option` where `u64` is the validator index if this deposit public key // already exists in the beacon_state. - let validator_index = get_existing_validator_index(state, &pubkey) + let validator_index = get_existing_validator_index(state, &deposit.data.pubkey) .map_err(|e| e.into_with_index(deposit_index))?; let amount = deposit.data.amount; @@ -457,7 +457,7 @@ pub fn process_deposit( // Create a new validator. let validator = Validator { - pubkey, + pubkey: pubkey.into(), withdrawal_credentials: deposit.data.withdrawal_credentials, activation_eligibility_epoch: spec.far_future_epoch, activation_epoch: spec.far_future_epoch, diff --git a/eth2/state_processing/src/per_block_processing/signature_sets.rs b/eth2/state_processing/src/per_block_processing/signature_sets.rs index d4ac169ef7..f765351880 100644 --- a/eth2/state_processing/src/per_block_processing/signature_sets.rs +++ b/eth2/state_processing/src/per_block_processing/signature_sets.rs @@ -2,7 +2,8 @@ //! validated individually, or alongside in others in a potentially cheaper bulk operation. //! //! This module exposes one function to extract each type of `SignatureSet` from a `BeaconBlock`. -use bls::{SignatureSet, SignedMessage}; +use bls::{G1Point, G1Ref, SignatureSet, SignedMessage}; +use std::borrow::Cow; use std::convert::TryInto; use tree_hash::{SignedRoot, TreeHash}; use types::{ @@ -26,6 +27,9 @@ pub enum Error { /// The public keys supplied do not match the number of objects requiring keys. Block validity /// was not determined. MismatchedPublicKeyLen { pubkey_len: usize, other_len: usize }, + /// The public key bytes stored in the `BeaconState` were not valid. This is a serious internal + /// error. + BadBlsBytes { validator_index: u64 }, } impl From for Error { @@ -42,10 +46,6 @@ pub fn block_proposal_signature_set<'a, T: EthSpec>( spec: &'a ChainSpec, ) -> Result> { let proposer_index = state.get_beacon_proposer_index(block.slot, spec)?; - let block_proposer = &state - .validators - .get(proposer_index) - .ok_or_else(|| Error::ValidatorUnknown(proposer_index as u64))?; let domain = spec.get_domain( block.slot.epoch(T::slots_per_epoch()), @@ -61,7 +61,7 @@ pub fn block_proposal_signature_set<'a, T: EthSpec>( Ok(SignatureSet::single( &block.signature, - &block_proposer.pubkey, + validator_pubkey(state, proposer_index)?, message, domain, )) @@ -73,7 +73,7 @@ pub fn randao_signature_set<'a, T: EthSpec>( block: &'a BeaconBlock, spec: &'a ChainSpec, ) -> Result> { - let block_proposer = &state.validators[state.get_beacon_proposer_index(block.slot, spec)?]; + let proposer_index = state.get_beacon_proposer_index(block.slot, spec)?; let domain = spec.get_domain( block.slot.epoch(T::slots_per_epoch()), @@ -85,7 +85,7 @@ pub fn randao_signature_set<'a, T: EthSpec>( Ok(SignatureSet::single( &block.body.randao_reveal, - &block_proposer.pubkey, + validator_pubkey(state, proposer_index)?, message, domain, )) @@ -97,14 +97,21 @@ pub fn proposer_slashing_signature_set<'a, T: EthSpec>( proposer_slashing: &'a ProposerSlashing, spec: &'a ChainSpec, ) -> Result<(SignatureSet<'a>, SignatureSet<'a>)> { - let proposer = state - .validators - .get(proposer_slashing.proposer_index as usize) - .ok_or_else(|| Error::ValidatorUnknown(proposer_slashing.proposer_index))?; + let proposer_index = proposer_slashing.proposer_index as usize; Ok(( - block_header_signature_set(state, &proposer_slashing.header_1, &proposer.pubkey, spec)?, - block_header_signature_set(state, &proposer_slashing.header_2, &proposer.pubkey, spec)?, + block_header_signature_set( + state, + &proposer_slashing.header_1, + validator_pubkey(state, proposer_index)?, + spec, + )?, + block_header_signature_set( + state, + &proposer_slashing.header_2, + validator_pubkey(state, proposer_index)?, + spec, + )?, )) } @@ -112,7 +119,7 @@ pub fn proposer_slashing_signature_set<'a, T: EthSpec>( fn block_header_signature_set<'a, T: EthSpec>( state: &'a BeaconState, header: &'a BeaconBlockHeader, - pubkey: &'a PublicKey, + pubkey: Cow<'a, G1Point>, spec: &'a ChainSpec, ) -> Result> { let domain = spec.get_domain( @@ -140,10 +147,13 @@ pub fn indexed_attestation_signature_set<'a, 'b, T: EthSpec>( ) -> Result> { let message = indexed_attestation.data.tree_hash_root(); - let signed_message = SignedMessage::new( - get_pubkeys(state, &indexed_attestation.attesting_indices)?, - message, - ); + let pubkeys = indexed_attestation + .attesting_indices + .into_iter() + .map(|&validator_idx| Ok(validator_pubkey(state, validator_idx as usize)?)) + .collect::>()?; + + let signed_message = SignedMessage::new(pubkeys, message); let domain = spec.get_domain( indexed_attestation.data.target.epoch, @@ -200,7 +210,7 @@ pub fn deposit_signature_set<'a>( // with the fork zeroed. SignatureSet::single( signature, - pubkey, + pubkey.g1_ref(), message.clone(), spec.get_deposit_domain(), ) @@ -213,10 +223,7 @@ pub fn exit_signature_set<'a, T: EthSpec>( exit: &'a VoluntaryExit, spec: &'a ChainSpec, ) -> Result> { - let validator = state - .validators - .get(exit.validator_index as usize) - .ok_or_else(|| Error::ValidatorUnknown(exit.validator_index))?; + let proposer_index = exit.validator_index as usize; let domain = spec.get_domain(exit.epoch, Domain::VoluntaryExit, &state.fork); @@ -224,29 +231,27 @@ pub fn exit_signature_set<'a, T: EthSpec>( Ok(SignatureSet::single( &exit.signature, - &validator.pubkey, + validator_pubkey(state, proposer_index)?, message, domain, )) } -/// Maps validator indices to public keys. -fn get_pubkeys<'a, 'b, T, I>( +/// Maps a validator index to a `PublicKey`. +fn validator_pubkey<'a, T: EthSpec>( state: &'a BeaconState, - validator_indices: I, -) -> Result> -where - I: IntoIterator, - T: EthSpec, -{ - validator_indices - .into_iter() - .map(|&validator_idx| { - state - .validators - .get(validator_idx as usize) - .ok_or_else(|| Error::ValidatorUnknown(validator_idx)) - .map(|validator| &validator.pubkey) + validator_index: usize, +) -> Result> { + let pubkey_bytes = &state + .validators + .get(validator_index) + .ok_or_else(|| Error::ValidatorUnknown(validator_index as u64))? + .pubkey; + + pubkey_bytes + .try_into() + .map(|pubkey: PublicKey| Cow::Owned(pubkey.as_raw().point.clone())) + .map_err(|_| Error::BadBlsBytes { + validator_index: validator_index as u64, }) - .collect() } diff --git a/eth2/state_processing/src/per_block_processing/verify_deposit.rs b/eth2/state_processing/src/per_block_processing/verify_deposit.rs index 9992eb89ec..42f1f6ec6c 100644 --- a/eth2/state_processing/src/per_block_processing/verify_deposit.rs +++ b/eth2/state_processing/src/per_block_processing/verify_deposit.rs @@ -35,7 +35,7 @@ pub fn verify_deposit_signature(deposit_data: &DepositData, spec: &ChainSpec) -> /// Errors if the state's `pubkey_cache` is not current. pub fn get_existing_validator_index( state: &BeaconState, - pub_key: &PublicKey, + pub_key: &PublicKeyBytes, ) -> Result> { let validator_index = state.get_validator_index(pub_key)?; Ok(validator_index.map(|idx| idx as u64)) diff --git a/eth2/types/Cargo.toml b/eth2/types/Cargo.toml index 1edd5e54cc..e52ad80322 100644 --- a/eth2/types/Cargo.toml +++ b/eth2/types/Cargo.toml @@ -4,6 +4,10 @@ version = "0.1.0" authors = ["Paul Hauner ", "Age Manning "] edition = "2018" +[[bench]] +name = "benches" +harness = false + [dependencies] bls = { path = "../utils/bls" } compare_fields = { path = "../utils/compare_fields" } @@ -37,3 +41,4 @@ tempfile = "3.1.0" [dev-dependencies] env_logger = "0.7.1" serde_json = "1.0.41" +criterion = "0.3.0" diff --git a/eth2/types/benches/benches.rs b/eth2/types/benches/benches.rs new file mode 100644 index 0000000000..10d9df7eca --- /dev/null +++ b/eth2/types/benches/benches.rs @@ -0,0 +1,79 @@ +use criterion::Criterion; +use criterion::{black_box, criterion_group, criterion_main, Benchmark}; +use rayon::prelude::*; +use ssz::{Decode, Encode}; +use types::{ + test_utils::generate_deterministic_keypair, BeaconState, Eth1Data, EthSpec, Hash256, + MainnetEthSpec, Validator, +}; + +fn get_state(validator_count: usize) -> BeaconState { + let spec = &E::default_spec(); + let eth1_data = Eth1Data { + deposit_root: Hash256::zero(), + deposit_count: 0, + block_hash: Hash256::zero(), + }; + + let mut state = BeaconState::new(0, eth1_data, spec); + + for i in 0..validator_count { + state.balances.push(i as u64).expect("should add balance"); + } + + state.validators = (0..validator_count) + .into_iter() + .collect::>() + .par_iter() + .map(|&i| Validator { + pubkey: generate_deterministic_keypair(i).pk.into(), + withdrawal_credentials: Hash256::from_low_u64_le(i as u64), + effective_balance: i as u64, + slashed: i % 2 == 0, + activation_eligibility_epoch: i.into(), + activation_epoch: i.into(), + exit_epoch: i.into(), + withdrawable_epoch: i.into(), + }) + .collect::>() + .into(); + + state +} + +fn all_benches(c: &mut Criterion) { + let validator_count = 16_384; + let state = get_state::(validator_count); + let state_bytes = state.as_ssz_bytes(); + + c.bench( + &format!("{}_validators", validator_count), + Benchmark::new("encode/beacon_state", move |b| { + b.iter_batched_ref( + || state.clone(), + |state| black_box(state.as_ssz_bytes()), + criterion::BatchSize::SmallInput, + ) + }) + .sample_size(10), + ); + + c.bench( + &format!("{}_validators", validator_count), + Benchmark::new("decode/beacon_state", move |b| { + b.iter_batched_ref( + || state_bytes.clone(), + |bytes| { + let state: BeaconState = + BeaconState::from_ssz_bytes(&bytes).expect("should decode"); + black_box(state) + }, + criterion::BatchSize::SmallInput, + ) + }) + .sample_size(10), + ); +} + +criterion_group!(benches, all_benches,); +criterion_main!(benches); diff --git a/eth2/types/examples/ssz_encode_state.rs b/eth2/types/examples/ssz_encode_state.rs new file mode 100644 index 0000000000..826835306e --- /dev/null +++ b/eth2/types/examples/ssz_encode_state.rs @@ -0,0 +1,50 @@ +//! These examples only really exist so we can use them for flamegraph. If they get annoying to +//! maintain, feel free to delete. + +use ssz::{Decode, Encode}; +use types::{ + test_utils::generate_deterministic_keypair, BeaconState, Eth1Data, EthSpec, Hash256, + MinimalEthSpec, Validator, +}; + +type E = MinimalEthSpec; + +fn get_state(validator_count: usize) -> BeaconState { + let spec = &E::default_spec(); + let eth1_data = Eth1Data { + deposit_root: Hash256::zero(), + deposit_count: 0, + block_hash: Hash256::zero(), + }; + + let mut state = BeaconState::new(0, eth1_data, spec); + + for i in 0..validator_count { + state.balances.push(i as u64).expect("should add balance"); + state + .validators + .push(Validator { + pubkey: generate_deterministic_keypair(i).pk.into(), + withdrawal_credentials: Hash256::from_low_u64_le(i as u64), + effective_balance: i as u64, + slashed: i % 2 == 0, + activation_eligibility_epoch: i.into(), + activation_epoch: i.into(), + exit_epoch: i.into(), + withdrawable_epoch: i.into(), + }) + .expect("should add validator"); + } + + state +} + +fn main() { + let validator_count = 1_024; + let state = get_state(validator_count); + + for _ in 0..1_024 { + let state_bytes = state.as_ssz_bytes(); + let _: BeaconState = BeaconState::from_ssz_bytes(&state_bytes).expect("should decode"); + } +} diff --git a/eth2/types/src/beacon_state.rs b/eth2/types/src/beacon_state.rs index 63d2a1f31f..4605144640 100644 --- a/eth2/types/src/beacon_state.rs +++ b/eth2/types/src/beacon_state.rs @@ -268,7 +268,7 @@ impl BeaconState { /// returns `None`. /// /// Requires a fully up-to-date `pubkey_cache`, returns an error if this is not the case. - pub fn get_validator_index(&self, pubkey: &PublicKey) -> Result, Error> { + pub fn get_validator_index(&self, pubkey: &PublicKeyBytes) -> Result, Error> { if self.pubkey_cache.len() == self.validators.len() { Ok(self.pubkey_cache.get(pubkey)) } else { @@ -860,7 +860,7 @@ impl BeaconState { .enumerate() .skip(self.pubkey_cache.len()) { - let success = self.pubkey_cache.insert(validator.pubkey.clone(), i); + let success = self.pubkey_cache.insert(validator.pubkey.clone().into(), i); if !success { return Err(Error::PubkeyCacheInconsistent); } diff --git a/eth2/types/src/beacon_state/pubkey_cache.rs b/eth2/types/src/beacon_state/pubkey_cache.rs index b601c3c11f..0063758a25 100644 --- a/eth2/types/src/beacon_state/pubkey_cache.rs +++ b/eth2/types/src/beacon_state/pubkey_cache.rs @@ -10,7 +10,7 @@ pub struct PubkeyCache { /// len, as it does not increase when duplicate keys are added. Duplicate keys are used during /// testing. len: usize, - map: HashMap, + map: HashMap, } impl PubkeyCache { @@ -23,7 +23,7 @@ impl PubkeyCache { /// /// The added index must equal the number of validators already added to the map. This ensures /// that an index is never skipped. - pub fn insert(&mut self, pubkey: PublicKey, index: ValidatorIndex) -> bool { + pub fn insert(&mut self, pubkey: PublicKeyBytes, index: ValidatorIndex) -> bool { if index == self.len { self.map.insert(pubkey, index); self.len += 1; @@ -34,7 +34,7 @@ impl PubkeyCache { } /// Looks up a validator index's by their public key. - pub fn get(&self, pubkey: &PublicKey) -> Option { + pub fn get(&self, pubkey: &PublicKeyBytes) -> Option { self.map.get(pubkey).copied() } } diff --git a/eth2/types/src/test_utils/builders/testing_beacon_state_builder.rs b/eth2/types/src/test_utils/builders/testing_beacon_state_builder.rs index a4b0cec658..04f22fa89b 100644 --- a/eth2/types/src/test_utils/builders/testing_beacon_state_builder.rs +++ b/eth2/types/src/test_utils/builders/testing_beacon_state_builder.rs @@ -109,7 +109,7 @@ impl TestingBeaconStateBuilder { )); Validator { - pubkey: keypair.pk.clone(), + pubkey: keypair.pk.clone().into(), withdrawal_credentials, // All validators start active. activation_eligibility_epoch: T::genesis_epoch(), diff --git a/eth2/types/src/validator.rs b/eth2/types/src/validator.rs index 70c3a69d97..5ece4cbf44 100644 --- a/eth2/types/src/validator.rs +++ b/eth2/types/src/validator.rs @@ -1,4 +1,4 @@ -use crate::{test_utils::TestRandom, Epoch, Hash256, PublicKey}; +use crate::{test_utils::TestRandom, Epoch, Hash256, PublicKeyBytes}; use serde_derive::{Deserialize, Serialize}; use ssz_derive::{Decode, Encode}; @@ -10,7 +10,7 @@ use tree_hash_derive::TreeHash; /// Spec v0.9.1 #[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Encode, Decode, TestRandom, TreeHash)] pub struct Validator { - pub pubkey: PublicKey, + pub pubkey: PublicKeyBytes, pub withdrawal_credentials: Hash256, pub effective_balance: u64, pub slashed: bool, @@ -46,7 +46,7 @@ impl Default for Validator { /// Yields a "default" `Validator`. Primarily used for testing. fn default() -> Self { Self { - pubkey: PublicKey::default(), + pubkey: PublicKeyBytes::empty(), withdrawal_credentials: Hash256::default(), activation_eligibility_epoch: Epoch::from(std::u64::MAX), activation_epoch: Epoch::from(std::u64::MAX), diff --git a/eth2/utils/bls/src/lib.rs b/eth2/utils/bls/src/lib.rs index b738c9b9b2..be80e0258b 100644 --- a/eth2/utils/bls/src/lib.rs +++ b/eth2/utils/bls/src/lib.rs @@ -14,7 +14,7 @@ pub use crate::public_key_bytes::PublicKeyBytes; pub use crate::secret_key::SecretKey; pub use crate::signature_bytes::SignatureBytes; pub use milagro_bls::{compress_g2, hash_on_g2, G1Point}; -pub use signature_set::{verify_signature_sets, SignatureSet, SignedMessage}; +pub use signature_set::{verify_signature_sets, G1Ref, SignatureSet, SignedMessage}; #[cfg(feature = "fake_crypto")] mod fake_aggregate_public_key; diff --git a/eth2/utils/bls/src/macros.rs b/eth2/utils/bls/src/macros.rs index 4acf185f0d..6809a3c956 100644 --- a/eth2/utils/bls/src/macros.rs +++ b/eth2/utils/bls/src/macros.rs @@ -128,6 +128,12 @@ macro_rules! bytes_struct { } } + impl std::hash::Hash for $name { + fn hash(&self, state: &mut H) { + self.0.hash(state) + } + } + impl Eq for $name {} impl std::convert::TryInto<$type> for &$name { diff --git a/eth2/utils/bls/src/signature_set.rs b/eth2/utils/bls/src/signature_set.rs index df1636f1d4..27eebe1aff 100644 --- a/eth2/utils/bls/src/signature_set.rs +++ b/eth2/utils/bls/src/signature_set.rs @@ -1,5 +1,6 @@ use crate::{AggregatePublicKey, AggregateSignature, PublicKey, Signature}; use milagro_bls::{G1Point, G2Point}; +use std::borrow::Cow; #[cfg(not(feature = "fake_crypto"))] use milagro_bls::AggregateSignature as RawAggregateSignature; @@ -9,17 +10,14 @@ type Domain = u64; #[derive(Clone, Debug)] pub struct SignedMessage<'a> { - signing_keys: Vec<&'a G1Point>, + signing_keys: Vec>, message: Message, } impl<'a> SignedMessage<'a> { - pub fn new(signing_keys: Vec<&'a T>, message: Message) -> Self - where - T: G1Ref, - { + pub fn new(signing_keys: Vec>, message: Message) -> Self { Self { - signing_keys: signing_keys.iter().map(|k| k.g1_ref()).collect(), + signing_keys, message, } } @@ -33,14 +31,13 @@ pub struct SignatureSet<'a> { } impl<'a> SignatureSet<'a> { - pub fn single( + pub fn single( signature: &'a S, - signing_key: &'a T, + signing_key: Cow<'a, G1Point>, message: Message, domain: Domain, ) -> Self where - T: G1Ref, S: G2Ref, { Self { @@ -53,13 +50,13 @@ impl<'a> SignatureSet<'a> { pub fn dual( signature: &'a S, message_0: Message, - message_0_signing_keys: Vec<&'a T>, + message_0_signing_keys: Vec>, message_1: Message, - message_1_signing_keys: Vec<&'a T>, + message_1_signing_keys: Vec>, domain: Domain, ) -> Self where - T: G1Ref, + T: G1Ref + Clone, S: G2Ref, { Self { @@ -95,7 +92,7 @@ impl<'a> SignatureSet<'a> { messages.push(signed_message.message.clone()); let point = if signed_message.signing_keys.len() == 1 { - signed_message.signing_keys[0].clone() + signed_message.signing_keys[0].clone().into_owned() } else { aggregate_public_keys(&signed_message.signing_keys) }; @@ -132,7 +129,7 @@ impl<'a> Into> for SignatureSet<'a> { .into_iter() .map(|signed_message| { let key = if signed_message.signing_keys.len() == 1 { - signed_message.signing_keys[0].clone() + signed_message.signing_keys[0].clone().into_owned() } else { aggregate_public_keys(&signed_message.signing_keys) }; @@ -146,12 +143,12 @@ impl<'a> Into> for SignatureSet<'a> { } /// Create an aggregate public key for a list of validators, failing if any key can't be found. -fn aggregate_public_keys<'a>(public_keys: &'a [&'a G1Point]) -> G1Point { +fn aggregate_public_keys<'a>(public_keys: &'a [Cow<'a, G1Point>]) -> G1Point { let mut aggregate = public_keys .iter() - .fold(AggregatePublicKey::new(), |mut aggregate, &pubkey| { - aggregate.add_point(pubkey); + .fold(AggregatePublicKey::new(), |mut aggregate, pubkey| { + aggregate.add_point(&pubkey); aggregate }); @@ -161,18 +158,18 @@ fn aggregate_public_keys<'a>(public_keys: &'a [&'a G1Point]) -> G1Point { } pub trait G1Ref { - fn g1_ref(&self) -> &G1Point; + fn g1_ref<'a>(&'a self) -> Cow<'a, G1Point>; } impl G1Ref for AggregatePublicKey { - fn g1_ref(&self) -> &G1Point { - &self.as_raw().point + fn g1_ref<'a>(&'a self) -> Cow<'a, G1Point> { + Cow::Borrowed(&self.as_raw().point) } } impl G1Ref for PublicKey { - fn g1_ref(&self) -> &G1Point { - &self.as_raw().point + fn g1_ref<'a>(&'a self) -> Cow<'a, G1Point> { + Cow::Borrowed(&self.as_raw().point) } } diff --git a/eth2/utils/remote_beacon_node/src/lib.rs b/eth2/utils/remote_beacon_node/src/lib.rs index 50dcff59f6..a775ba3a34 100644 --- a/eth2/utils/remote_beacon_node/src/lib.rs +++ b/eth2/utils/remote_beacon_node/src/lib.rs @@ -260,7 +260,10 @@ impl Validator { let bulk_request = BulkValidatorDutiesRequest { epoch, - pubkeys: validator_pubkeys.to_vec(), + pubkeys: validator_pubkeys + .iter() + .map(|pubkey| pubkey.clone().into()) + .collect(), }; self.url("duties") diff --git a/validator_client/src/attestation_service.rs b/validator_client/src/attestation_service.rs index daec479468..45cd997984 100644 --- a/validator_client/src/attestation_service.rs +++ b/validator_client/src/attestation_service.rs @@ -1,8 +1,11 @@ -use crate::{duties_service::DutiesService, validator_store::ValidatorStore}; +use crate::{ + duties_service::{DutiesService, ValidatorDuty}, + validator_store::ValidatorStore, +}; use environment::RuntimeContext; use exit_future::Signal; use futures::{Future, Stream}; -use remote_beacon_node::{PublishStatus, RemoteBeaconNode, ValidatorDuty}; +use remote_beacon_node::{PublishStatus, RemoteBeaconNode}; use slog::{crit, info, trace}; use slot_clock::SlotClock; use std::collections::HashMap; diff --git a/validator_client/src/duties_service.rs b/validator_client/src/duties_service.rs index b375eb341c..0146c1e049 100644 --- a/validator_client/src/duties_service.rs +++ b/validator_client/src/duties_service.rs @@ -3,15 +3,16 @@ use environment::RuntimeContext; use exit_future::Signal; use futures::{Future, IntoFuture, Stream}; use parking_lot::RwLock; -use remote_beacon_node::{RemoteBeaconNode, ValidatorDuty}; +use remote_beacon_node::RemoteBeaconNode; use slog::{crit, error, info, trace, warn}; use slot_clock::SlotClock; use std::collections::HashMap; +use std::convert::TryInto; use std::ops::Deref; use std::sync::Arc; use std::time::{Duration, Instant}; use tokio::timer::Interval; -use types::{ChainSpec, Epoch, EthSpec, PublicKey, Slot}; +use types::{ChainSpec, CommitteeIndex, Epoch, EthSpec, PublicKey, Slot}; /// Delay this period of time after the slot starts. This allows the node to process the new slot. const TIME_DELAY_FROM_SLOT: Duration = Duration::from_millis(100); @@ -21,6 +22,37 @@ const PRUNE_DEPTH: u64 = 4; type BaseHashMap = HashMap>; +/// Stores the duties for some validator for an epoch. +#[derive(PartialEq, Debug, Clone)] +pub struct ValidatorDuty { + /// The validator's BLS public key, uniquely identifying them. _48-bytes, hex encoded with 0x prefix, case insensitive._ + pub validator_pubkey: PublicKey, + /// The slot at which the validator must attest. + pub attestation_slot: Option, + /// The index of the committee within `slot` of which the validator is a member. + pub attestation_committee_index: Option, + /// The position of the validator in the committee. + pub attestation_committee_position: Option, + /// The slots in which a validator must propose a block (can be empty). + pub block_proposal_slots: Vec, +} + +impl TryInto for remote_beacon_node::ValidatorDuty { + type Error = String; + + fn try_into(self) -> Result { + Ok(ValidatorDuty { + validator_pubkey: (&self.validator_pubkey) + .try_into() + .map_err(|e| format!("Invalid pubkey bytes from server: {:?}", e))?, + attestation_slot: self.attestation_slot, + attestation_committee_index: self.attestation_committee_index, + attestation_committee_position: self.attestation_committee_position, + block_proposal_slots: self.block_proposal_slots, + }) + } +} + /// The outcome of inserting some `ValidatorDuty` into the `DutiesStore`. enum InsertOutcome { /// These are the first duties received for this validator. @@ -345,7 +377,7 @@ impl DutiesService { .get_duties_bulk(epoch, pubkeys.as_slice()) .map(move |all_duties| (epoch, all_duties)) .map_err(move |e| format!("Failed to get duties for epoch {}: {:?}", epoch, e)) - .map(move |(epoch, all_duties)| { + .and_then(move |(epoch, all_duties)| { let log = service_2.context.log.clone(); let mut new_validator = 0; @@ -354,7 +386,9 @@ impl DutiesService { let mut replaced = 0; let mut invalid = 0; - all_duties.into_iter().for_each(|duties| { + all_duties.into_iter().try_for_each::<_, Result<_, String>>(|remote_duties| { + let duties: ValidatorDuty = remote_duties.try_into()?; + match service_2 .store .insert(epoch, duties.clone(), E::slots_per_epoch()) @@ -374,7 +408,9 @@ impl DutiesService { InsertOutcome::Replaced => replaced += 1, InsertOutcome::Invalid => invalid += 1, }; - }); + + Ok(()) + })?; if invalid > 0 { error!( @@ -402,6 +438,8 @@ impl DutiesService { "info" => "Chain re-org likely occurred." ) } + + Ok(()) }) } }