From 6656cb00e446de6e7d4526893d9c4c47bb78ea86 Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Thu, 5 Mar 2020 17:19:35 +1100 Subject: [PATCH] Optimize attestation processing (#841) * Start updating types * WIP * Signature hacking * Existing EF tests passing with fake_crypto * Updates * Delete outdated API spec * The refactor continues * It compiles * WIP test fixes * All release tests passing bar genesis state parsing * Update and test YamlConfig * Update to spec v0.10 compatible BLS * Updates to BLS EF tests * Add EF test for AggregateVerify And delete unused hash2curve tests for uncompressed points * Update EF tests to v0.10.1 * Use optional block root correctly in block proc * Use genesis fork in deposit domain. All tests pass * Cargo fmt * Fast aggregate verify test * Update REST API docs * Cargo fmt * Fix unused import * Bump spec tags to v0.10.1 * Add `seconds_per_eth1_block` to chainspec * Update to timestamp based eth1 voting scheme * Return None from `get_votes_to_consider` if block cache is empty * Handle overflows in `is_candidate_block` * Revert to failing tests * Fix eth1 data sets test * Choose default vote according to spec * Fix collect_valid_votes tests * Fix `get_votes_to_consider` to choose all eligible blocks * Uncomment winning_vote tests * Add comments; remove unused code * Reduce seconds_per_eth1_block for simulation * Addressed review comments * Add test for default vote case * Fix logs * Remove unused functions * Meter default eth1 votes * Fix comments * Address review comments; remove unused dependency * Add first attempt at attestation proc. re-write * Add version 2 of attestation processing * Minor fixes * Add validator pubkey cache * Make get_indexed_attestation take a committee * Link signature processing into new attn verification * First working version * Ensure pubkey cache is updated * Add more metrics, slight optimizations * Clone committee cache during attestation processing * Update shuffling cache during block processing * Remove old commented-out code * Fix shuffling cache insert bug * Used indexed attestation in fork choice * Restructure attn processing, add metrics * Add more detailed metrics * Tidy, fix failing tests * Fix failing tests, tidy * Disable/delete two outdated tests * Tidy * Add pubkey cache persistence file * Add more comments * Integrate persistence file into builder * Add pubkey cache tests * Add data_dir to beacon chain builder * Remove Option in pubkey cache persistence file * Ensure consistency between datadir/data_dir * Fix failing network test * Tidy * Fix todos * Add attestation processing tests * Add another test * Only run attestation tests in release * Make attestation tests MainnetEthSpec * Address Michael's comments * Remove redundant check * Fix warning * Fix failing test Co-authored-by: Michael Sproul Co-authored-by: Pawan Dhananjay --- Cargo.lock | 1 + beacon_node/beacon_chain/Cargo.toml | 3 +- beacon_node/beacon_chain/src/beacon_chain.rs | 499 +++++++++++------- beacon_node/beacon_chain/src/builder.rs | 48 ++ beacon_node/beacon_chain/src/errors.rs | 8 + beacon_node/beacon_chain/src/fork_choice.rs | 30 +- beacon_node/beacon_chain/src/lib.rs | 2 + beacon_node/beacon_chain/src/metrics.rs | 41 +- .../beacon_chain/src/shuffling_cache.rs | 46 ++ beacon_node/beacon_chain/src/test_utils.rs | 12 + .../src/validator_pubkey_cache.rs | 323 ++++++++++++ .../beacon_chain/tests/attestation_tests.rs | 254 +++++++++ .../beacon_chain/tests/persistence_tests.rs | 12 +- beacon_node/beacon_chain/tests/store_tests.rs | 14 +- beacon_node/beacon_chain/tests/tests.rs | 20 +- beacon_node/client/src/builder.rs | 10 +- beacon_node/client/src/config.rs | 4 +- beacon_node/client/src/lib.rs | 2 +- beacon_node/network/src/message_processor.rs | 10 +- beacon_node/network/src/persisted_dht.rs | 12 +- beacon_node/network/src/service.rs | 4 +- beacon_node/network/src/service/tests.rs | 58 +- beacon_node/src/config.rs | 4 +- beacon_node/src/lib.rs | 6 +- eth2/operation_pool/src/attestation.rs | 5 +- eth2/operation_pool/src/attestation_id.rs | 18 +- eth2/operation_pool/src/lib.rs | 37 +- .../src/fork_choice_test_definition.rs | 10 +- .../src/proto_array.rs | 5 + .../src/proto_array_fork_choice.rs | 13 + .../src/common/get_attesting_indices.rs | 8 +- .../src/common/get_indexed_attestation.rs | 5 +- eth2/state_processing/src/lib.rs | 3 +- .../src/per_block_processing.rs | 2 +- .../block_signature_verifier.rs | 6 +- .../per_block_processing/signature_sets.rs | 28 +- .../verify_attestation.rs | 3 +- .../validator_statuses.rs | 4 +- 38 files changed, 1226 insertions(+), 344 deletions(-) create mode 100644 beacon_node/beacon_chain/src/shuffling_cache.rs create mode 100644 beacon_node/beacon_chain/src/validator_pubkey_cache.rs create mode 100644 beacon_node/beacon_chain/tests/attestation_tests.rs diff --git a/Cargo.lock b/Cargo.lock index d9e64d900e..536f886249 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -220,6 +220,7 @@ dependencies = [ "lighthouse_bootstrap 0.1.0", "lighthouse_metrics 0.1.0", "log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", + "lru 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)", "merkle_proof 0.1.0", "operation_pool 0.1.0", "parking_lot 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/beacon_node/beacon_chain/Cargo.toml b/beacon_node/beacon_chain/Cargo.toml index 7df36537b8..bd8fcb5e97 100644 --- a/beacon_node/beacon_chain/Cargo.toml +++ b/beacon_node/beacon_chain/Cargo.toml @@ -41,8 +41,9 @@ genesis = { path = "../genesis" } integer-sqrt = "0.1" rand = "0.7.2" proto_array_fork_choice = { path = "../../eth2/proto_array_fork_choice" } +lru = "0.4.3" +tempfile = "3.1.0" [dev-dependencies] -tempfile = "3.1.0" lazy_static = "1.4.0" environment = { path = "../../lighthouse/environment" } diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index e32ccff1fb..fc762adbe1 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -6,20 +6,21 @@ use crate::fork_choice::{Error as ForkChoiceError, ForkChoice}; use crate::head_tracker::HeadTracker; use crate::metrics; use crate::persisted_beacon_chain::{PersistedBeaconChain, BEACON_CHAIN_DB_KEY}; +use crate::shuffling_cache::ShufflingCache; use crate::timeout_rw_lock::TimeoutRwLock; +use crate::validator_pubkey_cache::ValidatorPubkeyCache; use operation_pool::{OperationPool, PersistedOperationPool}; use slog::{debug, error, info, trace, warn, Logger}; use slot_clock::SlotClock; use ssz::Encode; -use state_processing::per_block_processing::{ - errors::{ - AttestationValidationError, AttesterSlashingValidationError, ExitValidationError, - ProposerSlashingValidationError, - }, - verify_attestation_for_state, VerifySignatures, +use state_processing::per_block_processing::errors::{ + AttestationValidationError, AttesterSlashingValidationError, ExitValidationError, + ProposerSlashingValidationError, }; use state_processing::{ - per_block_processing, per_slot_processing, BlockProcessingError, BlockSignatureStrategy, + common::get_indexed_attestation, per_block_processing, per_slot_processing, + signature_sets::indexed_attestation_signature_set_from_pubkeys, BlockProcessingError, + BlockSignatureStrategy, }; use std::borrow::Cow; use std::cmp::Ordering; @@ -53,6 +54,14 @@ const MAXIMUM_BLOCK_SLOT_NUMBER: u64 = 4_294_967_296; // 2^32 /// head. const HEAD_LOCK_TIMEOUT: Duration = Duration::from_secs(1); +/// The time-out before failure during an operation to take a read/write RwLock on the +/// attestation cache. +const ATTESTATION_CACHE_LOCK_TIMEOUT: Duration = Duration::from_secs(1); + +/// The time-out before failure during an operation to take a read/write RwLock on the +/// validator pubkey cache. +const VALIDATOR_PUBKEY_CACHE_LOCK_TIMEOUT: Duration = Duration::from_secs(1); + #[derive(Debug, PartialEq)] pub enum BlockProcessingOutcome { /// Block was valid and imported into the block graph. @@ -93,14 +102,29 @@ pub enum AttestationProcessingOutcome { }, /// The attestation is attesting to a state that is later than itself. (Viz., attesting to the /// future). - AttestsToFutureState { - state: Slot, + AttestsToFutureBlock { + block: Slot, attestation: Slot, }, /// The slot is finalized, no need to import. FinalizedSlot { - attestation: Epoch, - finalized: Epoch, + attestation: Slot, + finalized: Slot, + }, + FutureEpoch { + attestation_epoch: Epoch, + current_epoch: Epoch, + }, + PastEpoch { + attestation_epoch: Epoch, + current_epoch: Epoch, + }, + BadTargetEpoch, + UnknownTargetRoot(Hash256), + InvalidSignature, + NoCommitteeForSlotAndIndex { + slot: Slot, + index: CommitteeIndex, }, Invalid(AttestationValidationError), } @@ -161,6 +185,10 @@ pub struct BeaconChain { pub event_handler: T::EventHandler, /// Used to track the heads of the beacon chain. pub(crate) head_tracker: HeadTracker, + /// Caches the shuffling for a given epoch and state root. + pub(crate) shuffling_cache: TimeoutRwLock, + /// Caches a map of `validator_index -> validator_pubkey`. + pub(crate) validator_pubkey_cache: TimeoutRwLock, /// Logging to CLI, etc. pub(crate) log: Logger, } @@ -710,11 +738,15 @@ impl BeaconChain { &self, attestation: Attestation, ) -> Result { + metrics::inc_counter(&metrics::ATTESTATION_PROCESSING_REQUESTS); + let timer = metrics::start_timer(&metrics::ATTESTATION_PROCESSING_TIMES); + let outcome = self.process_attestation_internal(attestation.clone()); match &outcome { Ok(outcome) => match outcome { AttestationProcessingOutcome::Processed => { + metrics::inc_counter(&metrics::ATTESTATION_PROCESSING_SUCCESSES); trace!( self.log, "Beacon attestation imported"; @@ -756,6 +788,7 @@ impl BeaconChain { } } + metrics::stop_timer(timer); outcome } @@ -763,216 +796,253 @@ impl BeaconChain { &self, attestation: Attestation, ) -> Result { - metrics::inc_counter(&metrics::ATTESTATION_PROCESSING_REQUESTS); - let timer = metrics::start_timer(&metrics::ATTESTATION_PROCESSING_TIMES); + let initial_validation_timer = + metrics::start_timer(&metrics::ATTESTATION_PROCESSING_INITIAL_VALIDATION_TIMES); + // There is no point in processing an attestation with an empty bitfield. Reject + // it immediately. if attestation.aggregation_bits.num_set_bits() == 0 { return Ok(AttestationProcessingOutcome::EmptyAggregationBitfield); } - // From the store, load the attestation's "head block". - // - // An honest validator would have set this block to be the head of the chain (i.e., the - // result of running fork choice). - let result = if let Some(attestation_head_block) = - self.get_block(&attestation.data.beacon_block_root)? - { - // If the attestation points to a block in the same epoch in which it was made, - // then it is sufficient to load the state from that epoch's boundary, because - // the epoch-variable fields like the justified checkpoints cannot have changed - // between the epoch boundary and when the attestation was made. If conversely, - // the attestation points to a block in a prior epoch, then it is necessary to - // load the full state corresponding to its block, and transition it to the - // attestation's epoch. - let attestation_block_root = attestation_head_block.state_root(); - let attestation_epoch = attestation.data.target.epoch; - let slots_per_epoch = T::EthSpec::slots_per_epoch(); - let mut state = if attestation_epoch - == attestation_head_block.slot().epoch(slots_per_epoch) - { - self.store - .load_epoch_boundary_state(&attestation_block_root)? - .ok_or_else(|| Error::MissingBeaconState(attestation_block_root))? - } else { - let mut state = self - .get_state_caching_only_with_committee_caches( - &attestation_head_block.state_root(), - Some(attestation_head_block.slot()), - )? - .ok_or_else(|| { - Error::MissingBeaconState(attestation_head_block.state_root()) - })?; + let attestation_epoch = attestation.data.slot.epoch(T::EthSpec::slots_per_epoch()); + let epoch_now = self.epoch()?; + let target = attestation.data.target.clone(); - // Fastforward the state to the epoch in which the attestation was made. - // NOTE: this looks like a potential DoS vector, we should probably limit - // the amount we're willing to fastforward without a valid signature. - for _ in state.slot.as_u64()..attestation_epoch.start_slot(slots_per_epoch).as_u64() - { - // Note: we provide the zero hash as the state root because the state root is - // irrelevant to attestation processing and therefore a waste of time to - // compute. - per_slot_processing(&mut state, Some(Hash256::zero()), &self.spec)?; - } - - state - }; - - state.build_committee_cache(RelativeEpoch::Current, &self.spec)?; - - // Reject any attestation where the `state` loaded from `data.beacon_block_root` - // has a higher slot than the attestation. - // - // Permitting this would allow for attesters to vote on _future_ slots. - if state.slot > attestation.data.slot { - Ok(AttestationProcessingOutcome::AttestsToFutureState { - state: state.slot, - attestation: attestation.data.slot, - }) - } else { - self.process_attestation_for_state_and_block( - attestation, - &state, - &attestation_head_block.message, - ) - } - } else { - // Drop any attestation where we have not processed `attestation.data.beacon_block_root`. - // - // This is likely overly restrictive, we could store the attestation for later - // processing. - let head_epoch = self.head_info()?.slot.epoch(T::EthSpec::slots_per_epoch()); - let attestation_epoch = attestation.data.slot.epoch(T::EthSpec::slots_per_epoch()); - - // Only log a warning if our head is in a reasonable place to verify this attestation. - // This avoids excess logging during syncing. - if head_epoch + 1 >= attestation_epoch { - trace!( - self.log, - "Dropped attestation for unknown block"; - "block" => format!("{}", attestation.data.beacon_block_root) - ); - } else { - trace!( - self.log, - "Dropped attestation for unknown block"; - "block" => format!("{}", attestation.data.beacon_block_root) - ); - } - - Ok(AttestationProcessingOutcome::UnknownHeadBlock { - beacon_block_root: attestation.data.beacon_block_root, - }) - }; - - metrics::stop_timer(timer); - - if let Ok(AttestationProcessingOutcome::Processed) = &result { - metrics::inc_counter(&metrics::ATTESTATION_PROCESSING_SUCCESSES); + // Attestation must be from the current or previous epoch. + if attestation_epoch > epoch_now { + return Ok(AttestationProcessingOutcome::FutureEpoch { + attestation_epoch, + current_epoch: epoch_now, + }); + } else if attestation_epoch + 1 < epoch_now { + return Ok(AttestationProcessingOutcome::PastEpoch { + attestation_epoch, + current_epoch: epoch_now, + }); } - result - } + if target.epoch != attestation.data.slot.epoch(T::EthSpec::slots_per_epoch()) { + return Ok(AttestationProcessingOutcome::BadTargetEpoch); + } - /// Verifies the `attestation` against the `state` to which it is attesting. - /// - /// Updates fork choice with any new latest messages, but _does not_ find or update the head. - /// - /// ## Notes - /// - /// The given `state` must fulfil one of the following conditions: - /// - /// - `state` corresponds to the `block.state_root` identified by - /// `attestation.data.beacon_block_root`. (Viz., `attestation` was created using `state`). - /// - `state.slot` is in the same epoch as `data.target.epoch` and - /// `attestation.data.beacon_block_root` is in the history of `state`. - /// - /// Additionally, `attestation.data.beacon_block_root` **must** be available to read in - /// `self.store` _and_ be the root of the given `block`. - /// - /// If the given conditions are not fulfilled, the function may error or provide a false - /// negative (indicating that a given `attestation` is invalid when it is was validly formed). - fn process_attestation_for_state_and_block( - &self, - attestation: Attestation, - state: &BeaconState, - block: &BeaconBlock, - ) -> Result { - // Find the highest between: + // Attestation target must be for a known block. // - // - The highest valid finalized epoch we've ever seen (i.e., the head). - // - The finalized epoch that this attestation was created against. - let finalized_epoch = std::cmp::max( - self.head_info()?.finalized_checkpoint.epoch, - state.finalized_checkpoint.epoch, - ); - - // A helper function to allow attestation processing to be metered. - let verify_attestation_for_state = |state, attestation, spec, verify_signatures| { - let timer = metrics::start_timer(&metrics::ATTESTATION_PROCESSING_CORE); - - let result = verify_attestation_for_state(state, attestation, spec, verify_signatures); - - metrics::stop_timer(timer); - result + // We use fork choice to find the target root, which means that we reject any attestation + // that has a `target.root` earlier than our latest finalized root. There's no point in + // processing an attestation that does not include our latest finalized block in its chain. + // + // We do not delay consideration for later, we simply drop the attestation. + let (target_block_slot, target_block_state_root) = if let Some((slot, state_root)) = + self.fork_choice.block_slot_and_state_root(&target.root) + { + (slot, state_root) + } else { + return Ok(AttestationProcessingOutcome::UnknownTargetRoot(target.root)); }; - if block.slot > 0 && block.slot <= finalized_epoch.start_slot(T::EthSpec::slots_per_epoch()) + // Load the slot and state root for `attestation.data.beacon_block_root`. + // + // This indirectly checks to see if the `attestation.data.beacon_block_root` is in our fork + // choice. Any known, non-finalized block should be in fork choice, so this check + // immediately filters out attestations that attest to a block that has not been processed. + // + // Attestations must be for a known block. If the block is unknown, we simply drop the + // attestation and do not delay consideration for later. + let block_slot = if let Some((slot, _state_root)) = self + .fork_choice + .block_slot_and_state_root(&attestation.data.beacon_block_root) { - // Ignore any attestation where the slot of `data.beacon_block_root` is equal to or - // prior to the finalized epoch. - // - // For any valid attestation if the `beacon_block_root` is prior to finalization, then - // all other parameters (source, target, etc) must all be prior to finalization and - // therefore no longer interesting. - // - // We allow the case where the block is the genesis block. Without this, all - // attestations prior to the first block being produced would be invalid. - Ok(AttestationProcessingOutcome::FinalizedSlot { - attestation: block.slot.epoch(T::EthSpec::slots_per_epoch()), - finalized: finalized_epoch, - }) - } else if let Err(e) = - verify_attestation_for_state(state, &attestation, VerifySignatures::True, &self.spec) - { - warn!( - self.log, - "Invalid attestation"; - "state_epoch" => state.current_epoch(), - "error" => format!("{:?}", e), - ); - - Ok(AttestationProcessingOutcome::Invalid(e)) + slot } else { - // If the attestation is from the current or previous epoch, supply it to the fork - // choice. This is FMD GHOST. - let current_epoch = self.epoch()?; - if attestation.data.target.epoch == current_epoch - || attestation.data.target.epoch == current_epoch - 1 - { - // Provide the attestation to fork choice, updating the validator latest messages but - // _without_ finding and updating the head. - if let Err(e) = self.fork_choice.process_attestation(&state, &attestation) { - error!( - self.log, - "Add attestation to fork choice failed"; - "beacon_block_root" => format!("{}", attestation.data.beacon_block_root), - "error" => format!("{:?}", e) - ); - return Err(e.into()); + return Ok(AttestationProcessingOutcome::UnknownHeadBlock { + beacon_block_root: attestation.data.beacon_block_root, + }); + }; + + // TODO: currently we do not check the FFG source/target. This is what the spec dictates + // but it seems wrong. + // + // I have opened an issue on the specs repo for this: + // + // https://github.com/ethereum/eth2.0-specs/issues/1636 + // + // We should revisit this code once that issue has been resolved. + + // Attestations must not be for blocks in the future. If this is the case, the attestation + // should not be considered. + if block_slot > attestation.data.slot { + return Ok(AttestationProcessingOutcome::AttestsToFutureBlock { + block: block_slot, + attestation: attestation.data.slot, + }); + } + + metrics::stop_timer(initial_validation_timer); + + let cache_wait_timer = + metrics::start_timer(&metrics::ATTESTATION_PROCESSING_SHUFFLING_CACHE_WAIT_TIMES); + + let mut shuffling_cache = self + .shuffling_cache + .try_write_for(ATTESTATION_CACHE_LOCK_TIMEOUT) + .ok_or_else(|| Error::AttestationCacheLockTimeout)?; + + metrics::stop_timer(cache_wait_timer); + + let indexed_attestation = + if let Some(committee_cache) = shuffling_cache.get(attestation_epoch, target.root) { + if let Some(committee) = committee_cache + .get_beacon_committee(attestation.data.slot, attestation.data.index) + { + let indexed_attestation = + get_indexed_attestation(committee.committee, &attestation)?; + + // Drop the shuffling cache to avoid holding the lock for any longer than + // required. + drop(shuffling_cache); + + indexed_attestation + } else { + return Ok(AttestationProcessingOutcome::NoCommitteeForSlotAndIndex { + slot: attestation.data.slot, + index: attestation.data.index, + }); } + } else { + // Drop the shuffling cache to avoid holding the lock for any longer than + // required. + drop(shuffling_cache); + + debug!( + self.log, + "Attestation processing cache miss"; + "attn_epoch" => attestation_epoch.as_u64(), + "head_block_epoch" => block_slot.epoch(T::EthSpec::slots_per_epoch()).as_u64(), + ); + + let state_read_timer = + metrics::start_timer(&metrics::ATTESTATION_PROCESSING_STATE_READ_TIMES); + + let mut state = self + .get_state_caching_only_with_committee_caches( + &target_block_state_root, + Some(target_block_slot), + )? + .ok_or_else(|| Error::MissingBeaconState(target_block_state_root))?; + + metrics::stop_timer(state_read_timer); + let state_skip_timer = + metrics::start_timer(&metrics::ATTESTATION_PROCESSING_STATE_SKIP_TIMES); + + while state.current_epoch() + 1 < attestation_epoch { + // Here we tell `per_slot_processing` to skip hashing the state and just + // use the zero hash instead. + // + // The state roots are not useful for the shuffling, so there's no need to + // compute them. + per_slot_processing(&mut state, Some(Hash256::zero()), &self.spec)? + } + + metrics::stop_timer(state_skip_timer); + let committee_building_timer = + metrics::start_timer(&metrics::ATTESTATION_PROCESSING_COMMITTEE_BUILDING_TIMES); + + let relative_epoch = + RelativeEpoch::from_epoch(state.current_epoch(), attestation_epoch) + .map_err(Error::IncorrectStateForAttestation)?; + + state.build_committee_cache(relative_epoch, &self.spec)?; + + let committee_cache = state.committee_cache(relative_epoch)?; + + self.shuffling_cache + .try_write_for(ATTESTATION_CACHE_LOCK_TIMEOUT) + .ok_or_else(|| Error::AttestationCacheLockTimeout)? + .insert(attestation_epoch, target.root, committee_cache); + + metrics::stop_timer(committee_building_timer); + + if let Some(committee) = committee_cache + .get_beacon_committee(attestation.data.slot, attestation.data.index) + { + get_indexed_attestation(committee.committee, &attestation)? + } else { + return Ok(AttestationProcessingOutcome::NoCommitteeForSlotAndIndex { + slot: attestation.data.slot, + index: attestation.data.index, + }); + } + }; + + let signature_setup_timer = + metrics::start_timer(&metrics::ATTESTATION_PROCESSING_SIGNATURE_SETUP_TIMES); + + let pubkey_cache = self + .validator_pubkey_cache + .try_read_for(VALIDATOR_PUBKEY_CACHE_LOCK_TIMEOUT) + .ok_or_else(|| Error::ValidatorPubkeyCacheLockTimeout)?; + + let pubkeys = indexed_attestation + .attesting_indices + .iter() + .map(|i| { + pubkey_cache + .get(*i as usize) + .ok_or_else(|| Error::ValidatorPubkeyCacheIncomplete(*i as usize)) + }) + .collect::, Error>>()?; + + let fork = self + .canonical_head + .try_read_for(HEAD_LOCK_TIMEOUT) + .ok_or_else(|| Error::CanonicalHeadLockTimeout) + .map(|head| head.beacon_state.fork.clone())?; + + let signature_set = indexed_attestation_signature_set_from_pubkeys( + pubkeys, + &attestation.signature, + &indexed_attestation, + &fork, + &self.spec, + ) + .map_err(Error::SignatureSetError)?; + + metrics::stop_timer(signature_setup_timer); + + let signature_verification_timer = + metrics::start_timer(&metrics::ATTESTATION_PROCESSING_SIGNATURE_TIMES); + + let signature_is_valid = signature_set.is_valid(); + + metrics::stop_timer(signature_verification_timer); + + if signature_is_valid { + // Provide the attestation to fork choice, updating the validator latest messages but + // _without_ finding and updating the head. + if let Err(e) = self + .fork_choice + .process_indexed_attestation(&indexed_attestation) + { + error!( + self.log, + "Add attestation to fork choice failed"; + "beacon_block_root" => format!("{}", attestation.data.beacon_block_root), + "error" => format!("{:?}", e) + ); + return Err(e.into()); } // Provide the valid attestation to op pool, which may choose to retain the // attestation for inclusion in a future block. if self.eth1_chain.is_some() { self.op_pool - .insert_attestation(attestation, state, &self.spec)?; + .insert_attestation(attestation, &fork, &self.spec)?; }; - // Update the metrics. - metrics::inc_counter(&metrics::ATTESTATION_PROCESSING_SUCCESSES); - Ok(AttestationProcessingOutcome::Processed) + } else { + Ok(AttestationProcessingOutcome::InvalidSignature) } } @@ -1289,6 +1359,41 @@ impl BeaconChain { let fork_choice_register_timer = metrics::start_timer(&metrics::BLOCK_PROCESSING_FORK_CHOICE_REGISTER); + // If there are new validators in this block, update our pubkey cache. + // + // We perform this _before_ adding the block to fork choice because the pubkey cache is + // used by attestation processing which will only process an attestation if the block is + // known to fork choice. This ordering ensure that the pubkey cache is always up-to-date. + self.validator_pubkey_cache + .try_write_for(VALIDATOR_PUBKEY_CACHE_LOCK_TIMEOUT) + .ok_or_else(|| Error::ValidatorPubkeyCacheLockTimeout)? + .import_new_pubkeys(&state)?; + + // If the imported block is in the previous or current epochs (according to the + // wall-clock), check to see if this is the first block of the epoch. If so, add the + // committee to the shuffling cache. + if state.current_epoch() + 1 >= self.epoch()? + && parent_block.slot().epoch(T::EthSpec::slots_per_epoch()) != state.current_epoch() + { + let mut shuffling_cache = self + .shuffling_cache + .try_write_for(ATTESTATION_CACHE_LOCK_TIMEOUT) + .ok_or_else(|| Error::AttestationCacheLockTimeout)?; + + let committee_cache = state.committee_cache(RelativeEpoch::Current)?; + + let epoch_start_slot = state + .current_epoch() + .start_slot(T::EthSpec::slots_per_epoch()); + let target_root = if state.slot == epoch_start_slot { + block_root + } else { + *state.get_block_root(epoch_start_slot)? + }; + + shuffling_cache.insert(state.current_epoch(), target_root, committee_cache); + } + // Register the new block with the fork choice service. if let Err(e) = self .fork_choice diff --git a/beacon_node/beacon_chain/src/builder.rs b/beacon_node/beacon_chain/src/builder.rs index e7052b5398..85527a9f0f 100644 --- a/beacon_node/beacon_chain/src/builder.rs +++ b/beacon_node/beacon_chain/src/builder.rs @@ -2,7 +2,9 @@ use crate::eth1_chain::CachingEth1Backend; use crate::events::NullEventHandler; use crate::head_tracker::HeadTracker; use crate::persisted_beacon_chain::{PersistedBeaconChain, BEACON_CHAIN_DB_KEY}; +use crate::shuffling_cache::ShufflingCache; use crate::timeout_rw_lock::TimeoutRwLock; +use crate::validator_pubkey_cache::ValidatorPubkeyCache; use crate::{ BeaconChain, BeaconChainTypes, CheckPoint, Eth1Chain, Eth1ChainBackend, EventHandler, ForkChoice, @@ -13,6 +15,7 @@ use proto_array_fork_choice::ProtoArrayForkChoice; use slog::{info, Logger}; use slot_clock::{SlotClock, TestingSlotClock}; use std::marker::PhantomData; +use std::path::PathBuf; use std::sync::Arc; use std::time::Duration; use store::Store; @@ -20,6 +23,8 @@ use types::{ BeaconBlock, BeaconState, ChainSpec, EthSpec, Hash256, Signature, SignedBeaconBlock, Slot, }; +pub const PUBKEY_CACHE_FILENAME: &str = "pubkey_cache.ssz"; + /// An empty struct used to "witness" all the `BeaconChainTypes` traits. It has no user-facing /// functionality and only exists to satisfy the type system. pub struct Witness( @@ -73,6 +78,9 @@ pub struct BeaconChainBuilder { slot_clock: Option, persisted_beacon_chain: Option>, head_tracker: Option, + data_dir: Option, + pubkey_cache_path: Option, + validator_pubkey_cache: Option, spec: ChainSpec, log: Option, } @@ -106,6 +114,9 @@ where slot_clock: None, persisted_beacon_chain: None, head_tracker: None, + pubkey_cache_path: None, + data_dir: None, + validator_pubkey_cache: None, spec: TEthSpec::default_spec(), log: None, } @@ -142,6 +153,15 @@ where self } + /// Sets the location to the pubkey cache file. + /// + /// Should generally be called early in the build chain. + pub fn data_dir(mut self, path: PathBuf) -> Self { + self.pubkey_cache_path = Some(path.join(PUBKEY_CACHE_FILENAME)); + self.data_dir = Some(path); + self + } + /// Attempt to load an existing chain from the builder's `Store`. /// /// May initialize several components; including the op_pool and finalized checkpoints. @@ -151,6 +171,11 @@ where .as_ref() .ok_or_else(|| "resume_from_db requires a log".to_string())?; + let pubkey_cache_path = self + .pubkey_cache_path + .as_ref() + .ok_or_else(|| "resume_from_db requires a data_dir".to_string())?; + info!( log, "Starting beacon chain"; @@ -194,6 +219,11 @@ where }; self.persisted_beacon_chain = Some(p); + let pubkey_cache = ValidatorPubkeyCache::load_from_file(pubkey_cache_path) + .map_err(|e| format!("Unable to open persisted pubkey cache: {:?}", e))?; + + self.validator_pubkey_cache = Some(pubkey_cache); + Ok(self) } @@ -308,6 +338,18 @@ where return Err("beacon_block.state_root != beacon_state".to_string()); } + let pubkey_cache_path = self + .pubkey_cache_path + .ok_or_else(|| "Cannot build without a pubkey cache path".to_string())?; + + let validator_pubkey_cache = self + .validator_pubkey_cache + .map(|cache| Ok(cache)) + .unwrap_or_else(|| { + ValidatorPubkeyCache::new(&canonical_head.beacon_state, pubkey_cache_path) + .map_err(|e| format!("Unable to init validator pubkey cache: {:?}", e)) + })?; + let beacon_chain = BeaconChain { spec: self.spec, store: self @@ -334,6 +376,8 @@ where .event_handler .ok_or_else(|| "Cannot build without an event handler".to_string())?, head_tracker: self.head_tracker.unwrap_or_default(), + shuffling_cache: TimeoutRwLock::new(ShufflingCache::new()), + validator_pubkey_cache: TimeoutRwLock::new(validator_pubkey_cache), log: log.clone(), }; @@ -384,6 +428,7 @@ where let backend = ProtoArrayForkChoice::new( finalized_checkpoint.beacon_block.message.slot, + finalized_checkpoint.beacon_block.message.state_root, // Note: here we set the `justified_epoch` to be the same as the epoch of the // finalized checkpoint. Whilst this finalized checkpoint may actually point to // a _later_ justified checkpoint, that checkpoint won't yet exist in the fork @@ -539,6 +584,7 @@ mod test { use ssz::Encode; use std::time::Duration; use store::{migrate::NullMigrator, MemoryStore}; + use tempfile::tempdir; use types::{EthSpec, MinimalEthSpec, Slot}; type TestEthSpec = MinimalEthSpec; @@ -556,6 +602,7 @@ mod test { let log = get_logger(); let store = Arc::new(MemoryStore::open()); let spec = MinimalEthSpec::default_spec(); + let data_dir = tempdir().expect("should create temporary data_dir"); let genesis_state = interop_genesis_state( &generate_deterministic_keypairs(validator_count), @@ -568,6 +615,7 @@ mod test { .logger(log.clone()) .store(store) .store_migrator(NullMigrator) + .data_dir(data_dir.path().to_path_buf()) .genesis_state(genesis_state) .expect("should build state using recent genesis") .dummy_eth1_backend() diff --git a/beacon_node/beacon_chain/src/errors.rs b/beacon_node/beacon_chain/src/errors.rs index 2753476e7c..1470d7c2a4 100644 --- a/beacon_node/beacon_chain/src/errors.rs +++ b/beacon_node/beacon_chain/src/errors.rs @@ -1,6 +1,7 @@ use crate::eth1_chain::Error as Eth1ChainError; use crate::fork_choice::Error as ForkChoiceError; use operation_pool::OpPoolError; +use ssz::DecodeError; use ssz_types::Error as SszTypesError; use state_processing::per_block_processing::errors::AttestationValidationError; use state_processing::BlockProcessingError; @@ -51,6 +52,13 @@ pub enum BeaconChainError { InvariantViolated(String), SszTypesError(SszTypesError), CanonicalHeadLockTimeout, + AttestationCacheLockTimeout, + ValidatorPubkeyCacheLockTimeout, + IncorrectStateForAttestation(RelativeEpochError), + InvalidValidatorPubkeyBytes(DecodeError), + ValidatorPubkeyCacheIncomplete(usize), + SignatureSetError(state_processing::signature_sets::Error), + ValidatorPubkeyCacheFileError(String), } easy_from_to!(SlotProcessingError, BeaconChainError); diff --git a/beacon_node/beacon_chain/src/fork_choice.rs b/beacon_node/beacon_chain/src/fork_choice.rs index 6985a913b8..9521fa9cfe 100644 --- a/beacon_node/beacon_chain/src/fork_choice.rs +++ b/beacon_node/beacon_chain/src/fork_choice.rs @@ -5,10 +5,10 @@ use checkpoint_manager::{get_effective_balances, CheckpointManager, CheckpointWi use parking_lot::{RwLock, RwLockReadGuard}; use proto_array_fork_choice::{core::ProtoArray, ProtoArrayForkChoice}; use ssz_derive::{Decode, Encode}; -use state_processing::common::get_attesting_indices; +use state_processing::common::get_indexed_attestation; use std::marker::PhantomData; use store::Error as StoreError; -use types::{Attestation, BeaconBlock, BeaconState, BeaconStateError, Epoch, Hash256}; +use types::{BeaconBlock, BeaconState, BeaconStateError, Epoch, Hash256, IndexedAttestation, Slot}; type Result = std::result::Result; @@ -24,6 +24,7 @@ pub enum Error { UnknownJustifiedBlock(Hash256), UnknownJustifiedState(Hash256), UnableToJsonEncode(String), + InvalidAttestation, } pub struct ForkChoice { @@ -105,6 +106,11 @@ impl ForkChoice { self.backend.contains_block(block_root) } + /// Returns the state root for the given block root. + pub fn block_slot_and_state_root(&self, block_root: &Hash256) -> Option<(Slot, Hash256)> { + self.backend.block_slot_and_state_root(block_root) + } + /// Process all attestations in the given `block`. /// /// Assumes the block (and therefore its attestations) are valid. It is a logic error to @@ -133,7 +139,12 @@ impl ForkChoice { .backend .contains_block(&attestation.data.beacon_block_root) { - self.process_attestation(state, attestation)?; + let committee = + state.get_beacon_committee(attestation.data.slot, attestation.data.index)?; + let indexed_attestation = + get_indexed_attestation(committee.committee, &attestation) + .map_err(|_| Error::InvalidAttestation)?; + self.process_indexed_attestation(&indexed_attestation)?; } } @@ -143,6 +154,7 @@ impl ForkChoice { block.slot, block_root, block.parent_root, + block.state_root, state.current_justified_checkpoint.epoch, state.finalized_checkpoint.epoch, )?; @@ -155,10 +167,9 @@ impl ForkChoice { /// Process an attestation which references `block` in `attestation.data.beacon_block_root`. /// /// Assumes the attestation is valid. - pub fn process_attestation( + pub fn process_indexed_attestation( &self, - state: &BeaconState, - attestation: &Attestation, + attestation: &IndexedAttestation, ) -> Result<()> { let timer = metrics::start_timer(&metrics::FORK_CHOICE_PROCESS_ATTESTATION_TIMES); @@ -180,12 +191,9 @@ impl ForkChoice { // // Additionally, don't add any block hash to fork choice unless we have imported the block. if block_hash != Hash256::zero() { - let validator_indices = - get_attesting_indices(state, &attestation.data, &attestation.aggregation_bits)?; - - for validator_index in validator_indices { + for validator_index in attestation.attesting_indices.iter() { self.backend.process_attestation( - validator_index, + *validator_index as usize, block_hash, attestation.data.target.epoch, )?; diff --git a/beacon_node/beacon_chain/src/lib.rs b/beacon_node/beacon_chain/src/lib.rs index c04995fc0f..cc4a9b8f5c 100644 --- a/beacon_node/beacon_chain/src/lib.rs +++ b/beacon_node/beacon_chain/src/lib.rs @@ -12,8 +12,10 @@ mod fork_choice; mod head_tracker; mod metrics; mod persisted_beacon_chain; +mod shuffling_cache; pub mod test_utils; mod timeout_rw_lock; +mod validator_pubkey_cache; pub use self::beacon_chain::{ AttestationProcessingOutcome, BeaconChain, BeaconChainTypes, BlockProcessingOutcome, diff --git a/beacon_node/beacon_chain/src/metrics.rs b/beacon_node/beacon_chain/src/metrics.rs index cee354a230..54331016b5 100644 --- a/beacon_node/beacon_chain/src/metrics.rs +++ b/beacon_node/beacon_chain/src/metrics.rs @@ -86,10 +86,42 @@ lazy_static! { "beacon_attestation_processing_seconds", "Full runtime of attestation processing" ); - pub static ref ATTESTATION_PROCESSING_CORE: Result = try_create_histogram( - "beacon_attestation_processing_core_seconds", - "Time spent on the core spec processing of attestation processing" + pub static ref ATTESTATION_PROCESSING_INITIAL_VALIDATION_TIMES: Result = try_create_histogram( + "beacon_attestation_processing_initial_validation_seconds", + "Time spent on the initial_validation of attestation processing" ); + pub static ref ATTESTATION_PROCESSING_SHUFFLING_CACHE_WAIT_TIMES: Result = try_create_histogram( + "beacon_attestation_processing_shuffling_cache_wait_seconds", + "Time spent on waiting for the shuffling cache lock during attestation processing" + ); + pub static ref ATTESTATION_PROCESSING_COMMITTEE_BUILDING_TIMES: Result = try_create_histogram( + "beacon_attestation_processing_committee_building_seconds", + "Time spent on building committees during attestation processing" + ); + pub static ref ATTESTATION_PROCESSING_STATE_READ_TIMES: Result = try_create_histogram( + "beacon_attestation_processing_state_read_seconds", + "Time spent on reading the state during attestation processing" + ); + pub static ref ATTESTATION_PROCESSING_STATE_SKIP_TIMES: Result = try_create_histogram( + "beacon_attestation_processing_state_skip_seconds", + "Time spent on reading the state during attestation processing" + ); + pub static ref ATTESTATION_PROCESSING_SIGNATURE_SETUP_TIMES: Result = try_create_histogram( + "beacon_attestation_processing_signature_setup_seconds", + "Time spent on setting up for the signature verification of attestation processing" + ); + pub static ref ATTESTATION_PROCESSING_SIGNATURE_TIMES: Result = try_create_histogram( + "beacon_attestation_processing_signature_seconds", + "Time spent on the signature verification of attestation processing" + ); + + /* + * Shuffling cache + */ + pub static ref SHUFFLING_CACHE_HITS: Result = + try_create_int_counter("beacon_shuffling_cache_hits_total", "Count of times shuffling cache fulfils request"); + pub static ref SHUFFLING_CACHE_MISSES: Result = + try_create_int_counter("beacon_shuffling_cache_misses_total", "Count of times shuffling cache fulfils request"); /* * Attestation Production @@ -106,7 +138,10 @@ lazy_static! { "beacon_attestation_production_seconds", "Full runtime of attestation production" ); +} +// Second lazy-static block is used to account for macro recursion limit. +lazy_static! { /* * Fork Choice */ diff --git a/beacon_node/beacon_chain/src/shuffling_cache.rs b/beacon_node/beacon_chain/src/shuffling_cache.rs new file mode 100644 index 0000000000..d8b6e8706e --- /dev/null +++ b/beacon_node/beacon_chain/src/shuffling_cache.rs @@ -0,0 +1,46 @@ +use crate::metrics; +use lru::LruCache; +use types::{beacon_state::CommitteeCache, Epoch, Hash256}; + +/// The size of the LRU cache that stores committee caches for quicker verification. +/// +/// Each entry should be `8 + 800,000 = 800,008` bytes in size with 100k validators. (8-byte hash + +/// 100k indices). Therefore, this cache should be approx `16 * 800,008 = 12.8 MB`. (Note: this +/// ignores a few extra bytes in the caches that should be insignificant compared to the indices). +const CACHE_SIZE: usize = 16; + +/// Provides an LRU cache for `CommitteeCache`. +/// +/// It has been named `ShufflingCache` because `CommitteeCacheCache` is a bit weird and looks like +/// a find/replace error. +pub struct ShufflingCache { + cache: LruCache<(Epoch, Hash256), CommitteeCache>, +} + +impl ShufflingCache { + pub fn new() -> Self { + Self { + cache: LruCache::new(CACHE_SIZE), + } + } + + pub fn get(&mut self, epoch: Epoch, root: Hash256) -> Option<&CommitteeCache> { + let opt = self.cache.get(&(epoch, root)); + + if opt.is_some() { + metrics::inc_counter(&metrics::SHUFFLING_CACHE_HITS); + } else { + metrics::inc_counter(&metrics::SHUFFLING_CACHE_MISSES); + } + + opt + } + + pub fn insert(&mut self, epoch: Epoch, root: Hash256, committee_cache: &CommitteeCache) { + let key = (epoch, root); + + if !self.cache.contains(&key) { + self.cache.put(key, committee_cache.clone()); + } + } +} diff --git a/beacon_node/beacon_chain/src/test_utils.rs b/beacon_node/beacon_chain/src/test_utils.rs index c2c8cb2b8d..026cc8a32d 100644 --- a/beacon_node/beacon_chain/src/test_utils.rs +++ b/beacon_node/beacon_chain/src/test_utils.rs @@ -18,6 +18,7 @@ use store::{ migrate::{BlockingMigrator, NullMigrator}, DiskStore, MemoryStore, Migrate, Store, }; +use tempfile::{tempdir, TempDir}; use types::{ AggregateSignature, Attestation, BeaconState, ChainSpec, Domain, EthSpec, Hash256, Keypair, SecretKey, Signature, SignedBeaconBlock, SignedRoot, Slot, @@ -76,11 +77,13 @@ pub struct BeaconChainHarness { pub chain: BeaconChain, pub keypairs: Vec, pub spec: ChainSpec, + pub data_dir: TempDir, } impl BeaconChainHarness> { /// Instantiate a new harness with `validator_count` initial validators. pub fn new(eth_spec_instance: E, keypairs: Vec) -> Self { + let data_dir = tempdir().expect("should create temporary data_dir"); let spec = E::default_spec(); let log = NullLoggerBuilder.build().expect("logger should build"); @@ -90,6 +93,7 @@ impl BeaconChainHarness> { .custom_spec(spec.clone()) .store(Arc::new(MemoryStore::open())) .store_migrator(NullMigrator) + .data_dir(data_dir.path().to_path_buf()) .genesis_state( interop_genesis_state::(&keypairs, HARNESS_GENESIS_TIME, &spec) .expect("should generate interop state"), @@ -109,6 +113,7 @@ impl BeaconChainHarness> { spec: chain.spec.clone(), chain, keypairs, + data_dir, } } } @@ -120,6 +125,7 @@ impl BeaconChainHarness> { store: Arc>, keypairs: Vec, ) -> Self { + let data_dir = tempdir().expect("should create temporary data_dir"); let spec = E::default_spec(); let log = NullLoggerBuilder.build().expect("logger should build"); @@ -129,6 +135,7 @@ impl BeaconChainHarness> { .custom_spec(spec.clone()) .store(store.clone()) .store_migrator( as Migrate<_, E>>::new(store)) + .data_dir(data_dir.path().to_path_buf()) .genesis_state( interop_genesis_state::(&keypairs, HARNESS_GENESIS_TIME, &spec) .expect("should generate interop state"), @@ -148,6 +155,7 @@ impl BeaconChainHarness> { spec: chain.spec.clone(), chain, keypairs, + data_dir, } } @@ -156,6 +164,7 @@ impl BeaconChainHarness> { eth_spec_instance: E, store: Arc>, keypairs: Vec, + data_dir: TempDir, ) -> Self { let spec = E::default_spec(); @@ -166,6 +175,7 @@ impl BeaconChainHarness> { .custom_spec(spec) .store(store.clone()) .store_migrator( as Migrate<_, E>>::new(store)) + .data_dir(data_dir.path().to_path_buf()) .resume_from_db(Eth1Config::default()) .expect("should resume beacon chain from db") .dummy_eth1_backend() @@ -182,6 +192,7 @@ impl BeaconChainHarness> { spec: chain.spec.clone(), chain, keypairs, + data_dir, } } } @@ -400,6 +411,7 @@ where let message = attestation.data.signing_root(domain); let mut agg_sig = AggregateSignature::new(); + agg_sig.add(&Signature::new( message.as_bytes(), self.get_sk(*validator_index), diff --git a/beacon_node/beacon_chain/src/validator_pubkey_cache.rs b/beacon_node/beacon_chain/src/validator_pubkey_cache.rs new file mode 100644 index 0000000000..c621779216 --- /dev/null +++ b/beacon_node/beacon_chain/src/validator_pubkey_cache.rs @@ -0,0 +1,323 @@ +use crate::errors::BeaconChainError; +use ssz::{Decode, DecodeError, Encode}; +use std::convert::TryInto; +use std::fs::{File, OpenOptions}; +use std::io::{self, Read, Write}; +use std::path::Path; +use types::{BeaconState, EthSpec, PublicKey, PublicKeyBytes}; + +/// Provides a mapping of `validator_index -> validator_publickey`. +/// +/// This cache exists for two reasons: +/// +/// 1. To avoid reading a `BeaconState` from disk each time we need a public key. +/// 2. To reduce the amount of public key _decompression_ required. A `BeaconState` stores public +/// keys in compressed form and they are needed in decompressed form for signature verification. +/// Decompression is expensive when many keys are involved. +/// +/// The cache has a `persistence_file` that it uses to maintain a persistent, on-disk +/// copy of itself. This allows it to be restored between process invocations. +pub struct ValidatorPubkeyCache { + pubkeys: Vec, + persitence_file: ValidatorPubkeyCacheFile, +} + +impl ValidatorPubkeyCache { + pub fn load_from_file>(path: P) -> Result { + ValidatorPubkeyCacheFile::open(&path) + .and_then(ValidatorPubkeyCacheFile::into_cache) + .map_err(Into::into) + } + + /// Create a new public key cache using the keys in `state.validators`. + /// + /// Also creates a new persistence file, returning an error if there is already a file at + /// `persistence_path`. + pub fn new>( + state: &BeaconState, + persistence_path: P, + ) -> Result { + if persistence_path.as_ref().exists() { + return Err(BeaconChainError::ValidatorPubkeyCacheFileError(format!( + "Persistence file already exists: {:?}", + persistence_path.as_ref() + ))); + } + + let mut cache = Self { + persitence_file: ValidatorPubkeyCacheFile::create(persistence_path)?, + pubkeys: vec![], + }; + + cache.import_new_pubkeys(state)?; + + Ok(cache) + } + + /// Scan the given `state` and add any new validator public keys. + /// + /// Does not delete any keys from `self` if they don't appear in `state`. + pub fn import_new_pubkeys( + &mut self, + state: &BeaconState, + ) -> Result<(), BeaconChainError> { + state + .validators + .iter() + .skip(self.pubkeys.len()) + .try_for_each(|v| { + let i = self.pubkeys.len(); + + // The item is written to disk (the persistence file) _before_ it is written into + // the local struct. + // + // This means that a pubkey cache read from disk will always be equivalent to or + // _later than_ the cache that was running in the previous instance of Lighthouse. + // + // The motivation behind this ordering is that we do not want to have states that + // reference a pubkey that is not in our cache. However, it's fine to have pubkeys + // that are never referenced in a state. + self.persitence_file.append(i, &v.pubkey)?; + + self.pubkeys.push( + (&v.pubkey) + .try_into() + .map_err(BeaconChainError::InvalidValidatorPubkeyBytes)?, + ); + + Ok(()) + }) + } + + /// Get the public key for a validator with index `i`. + pub fn get(&self, i: usize) -> Option<&PublicKey> { + self.pubkeys.get(i) + } +} + +/// Allows for maintaining an on-disk copy of the `ValidatorPubkeyCache`. The file is raw SSZ bytes +/// (not ASCII encoded). +/// +/// ## Writes +/// +/// Each entry is simply appended to the file. +/// +/// ## Reads +/// +/// The whole file is parsed as an SSZ "variable list" of objects. +/// +/// This parsing method is possible because the items in the list are fixed-length SSZ objects. +struct ValidatorPubkeyCacheFile(File); + +#[derive(Debug)] +enum Error { + IoError(io::Error), + SszError(DecodeError), + /// The file read from disk does not have a contiguous list of validator public keys. The file + /// has become corrupted. + InconsistentIndex { + expected: Option, + found: usize, + }, +} + +impl From for BeaconChainError { + fn from(e: Error) -> BeaconChainError { + BeaconChainError::ValidatorPubkeyCacheFileError(format!("{:?}", e)) + } +} + +impl ValidatorPubkeyCacheFile { + /// Creates a file for reading and writing. + pub fn create>(path: P) -> Result { + OpenOptions::new() + .create_new(true) + .write(true) + .open(path) + .map(Self) + .map_err(Error::IoError) + } + + /// Opens an existing file for reading and writing. + pub fn open>(path: P) -> Result { + OpenOptions::new() + .read(true) + .write(true) + .create(false) + .append(true) + .open(path) + .map(Self) + .map_err(Error::IoError) + } + + /// Append a public key to file. + /// + /// The provided `index` should each be one greater than the previous and start at 0. + /// Otherwise, the file will become corrupted and unable to be converted into a cache . + pub fn append(&mut self, index: usize, pubkey: &PublicKeyBytes) -> Result<(), Error> { + append_to_file(&mut self.0, index, pubkey) + } + + /// Creates a `ValidatorPubkeyCache` by reading and parsing the underlying file. + pub fn into_cache(mut self) -> Result { + let mut bytes = vec![]; + self.0.read_to_end(&mut bytes).map_err(Error::IoError)?; + + let list: Vec<(usize, PublicKeyBytes)> = + Vec::from_ssz_bytes(&bytes).map_err(Error::SszError)?; + + let mut last = None; + let mut pubkeys = Vec::with_capacity(list.len()); + + for (index, pubkey) in list { + let expected = last.map(|n| n + 1); + if expected.map_or(true, |expected| index == expected) { + last = Some(index); + pubkeys.push((&pubkey).try_into().map_err(Error::SszError)?); + } else { + return Err(Error::InconsistentIndex { + expected, + found: index, + }); + } + } + + Ok(ValidatorPubkeyCache { + pubkeys, + persitence_file: self, + }) + } +} + +fn append_to_file(file: &mut File, index: usize, pubkey: &PublicKeyBytes) -> Result<(), Error> { + let mut line = Vec::with_capacity(index.ssz_bytes_len() + pubkey.ssz_bytes_len()); + + index.ssz_append(&mut line); + pubkey.ssz_append(&mut line); + + file.write_all(&mut line).map_err(Error::IoError) +} + +#[cfg(test)] +mod test { + use super::*; + use tempfile::tempdir; + use types::{ + test_utils::{generate_deterministic_keypair, TestingBeaconStateBuilder}, + BeaconState, EthSpec, Keypair, MainnetEthSpec, + }; + + fn get_state(validator_count: usize) -> (BeaconState, Vec) { + let spec = MainnetEthSpec::default_spec(); + let builder = + TestingBeaconStateBuilder::from_deterministic_keypairs(validator_count, &spec); + builder.build() + } + + fn check_cache_get(cache: &ValidatorPubkeyCache, keypairs: &[Keypair]) { + let validator_count = keypairs.len(); + + for i in 0..validator_count + 1 { + if i < validator_count { + let pubkey = cache.get(i).expect("pubkey should be present"); + assert_eq!(pubkey, &keypairs[i].pk, "pubkey should match cache"); + } else { + assert_eq!( + cache.get(i), + None, + "should not get pubkey for out of bounds index", + ); + } + } + } + + #[test] + fn basic_operation() { + let (state, keypairs) = get_state(8); + + let dir = tempdir().expect("should create tempdir"); + let path = dir.path().join("cache.ssz"); + + let mut cache = ValidatorPubkeyCache::new(&state, path).expect("should create cache"); + + check_cache_get(&cache, &keypairs[..]); + + // Try adding a state with the same number of keypairs. + let (state, keypairs) = get_state(8); + cache + .import_new_pubkeys(&state) + .expect("should import pubkeys"); + check_cache_get(&cache, &keypairs[..]); + + // Try adding a state with less keypairs. + let (state, _) = get_state(1); + cache + .import_new_pubkeys(&state) + .expect("should import pubkeys"); + check_cache_get(&cache, &keypairs[..]); + + // Try adding a state with more keypairs. + let (state, keypairs) = get_state(12); + cache + .import_new_pubkeys(&state) + .expect("should import pubkeys"); + check_cache_get(&cache, &keypairs[..]); + } + + #[test] + fn persistence() { + let (state, keypairs) = get_state(8); + + let dir = tempdir().expect("should create tempdir"); + let path = dir.path().join("cache.ssz"); + + // Create a new cache. + let cache = ValidatorPubkeyCache::new(&state, &path).expect("should create cache"); + check_cache_get(&cache, &keypairs[..]); + drop(cache); + + // Re-init the cache from the file. + let mut cache = ValidatorPubkeyCache::load_from_file(&path).expect("should open cache"); + check_cache_get(&cache, &keypairs[..]); + + // Add some more keypairs. + let (state, keypairs) = get_state(12); + cache + .import_new_pubkeys(&state) + .expect("should import pubkeys"); + check_cache_get(&cache, &keypairs[..]); + drop(cache); + + // Re-init the cache from the file. + let cache = ValidatorPubkeyCache::load_from_file(&path).expect("should open cache"); + check_cache_get(&cache, &keypairs[..]); + } + + #[test] + fn invalid_persisted_file() { + let dir = tempdir().expect("should create tempdir"); + let path = dir.path().join("cache.ssz"); + let pubkey = generate_deterministic_keypair(0).pk.into(); + + let mut file = File::create(&path).expect("should create file"); + append_to_file(&mut file, 0, &pubkey).expect("should write to file"); + drop(file); + + let cache = ValidatorPubkeyCache::load_from_file(&path).expect("should open cache"); + drop(cache); + + let mut file = OpenOptions::new() + .write(true) + .append(true) + .open(&path) + .expect("should open file"); + + append_to_file(&mut file, 42, &pubkey).expect("should write bad data to file"); + drop(file); + + assert!( + ValidatorPubkeyCache::load_from_file(&path).is_err(), + "should not parse invalid file" + ); + } +} diff --git a/beacon_node/beacon_chain/tests/attestation_tests.rs b/beacon_node/beacon_chain/tests/attestation_tests.rs new file mode 100644 index 0000000000..52ccd67c48 --- /dev/null +++ b/beacon_node/beacon_chain/tests/attestation_tests.rs @@ -0,0 +1,254 @@ +#![cfg(not(debug_assertions))] + +#[macro_use] +extern crate lazy_static; + +use beacon_chain::test_utils::{ + AttestationStrategy, BeaconChainHarness, BlockStrategy, HarnessType, +}; +use beacon_chain::AttestationProcessingOutcome; +use state_processing::per_slot_processing; +use types::{ + test_utils::generate_deterministic_keypair, AggregateSignature, BitList, EthSpec, Hash256, + Keypair, MainnetEthSpec, Signature, +}; + +pub const VALIDATOR_COUNT: usize = 128; + +lazy_static! { + /// A cached set of keys. + static ref KEYPAIRS: Vec = types::test_utils::generate_deterministic_keypairs(VALIDATOR_COUNT); +} + +fn get_harness(validator_count: usize) -> BeaconChainHarness> { + let harness = BeaconChainHarness::new(MainnetEthSpec, KEYPAIRS[0..validator_count].to_vec()); + + harness.advance_slot(); + + harness +} + +#[test] +fn attestation_validity() { + let harness = get_harness(VALIDATOR_COUNT); + let chain = &harness.chain; + + // Extend the chain out a few epochs so we have some chain depth to play with. + harness.extend_chain( + MainnetEthSpec::slots_per_epoch() as usize * 3 + 1, + BlockStrategy::OnCanonicalHead, + AttestationStrategy::AllValidators, + ); + + let head = chain.head().expect("should get head"); + let current_slot = chain.slot().expect("should get slot"); + let current_epoch = chain.epoch().expect("should get epoch"); + + let valid_attestation = harness + .get_free_attestations( + &AttestationStrategy::AllValidators, + &head.beacon_state, + head.beacon_block_root, + head.beacon_block.slot(), + ) + .first() + .cloned() + .expect("should get at least one attestation"); + + assert_eq!( + chain.process_attestation(valid_attestation.clone()), + Ok(AttestationProcessingOutcome::Processed), + "should accept valid attestation" + ); + + /* + * Should reject attestations if the slot does not match the target epoch. + */ + + let mut epoch_mismatch_attestation = valid_attestation.clone(); + epoch_mismatch_attestation.data.target.epoch = current_epoch + 1; + + assert_eq!( + harness + .chain + .process_attestation(epoch_mismatch_attestation), + Ok(AttestationProcessingOutcome::BadTargetEpoch), + "should not accept attestation where the slot is not in the same epoch as the target" + ); + + /* + * Should reject attestations from future epochs. + */ + + let mut early_attestation = valid_attestation.clone(); + early_attestation.data.target.epoch = current_epoch + 1; + early_attestation.data.slot = (current_epoch + 1).start_slot(MainnetEthSpec::slots_per_epoch()); + + assert_eq!( + harness.chain.process_attestation(early_attestation), + Ok(AttestationProcessingOutcome::FutureEpoch { + attestation_epoch: current_epoch + 1, + current_epoch + }), + "should not accept early attestation" + ); + + /* + * Should reject attestations from epochs prior to the previous epoch. + */ + + let late_slot = (current_epoch - 2).start_slot(MainnetEthSpec::slots_per_epoch()); + let late_block = chain + .block_at_slot(late_slot) + .expect("should not error getting block at slot") + .expect("should find block at slot"); + let late_state = chain + .get_state(&late_block.state_root(), Some(late_slot)) + .expect("should not error getting state") + .expect("should find state"); + let late_attestation = harness + .get_free_attestations( + &AttestationStrategy::AllValidators, + &late_state, + late_block.canonical_root(), + late_slot, + ) + .first() + .cloned() + .expect("should get at least one late attestation"); + + assert_eq!( + harness.chain.process_attestation(late_attestation), + Ok(AttestationProcessingOutcome::PastEpoch { + attestation_epoch: current_epoch - 2, + current_epoch + }), + "should not accept late attestation" + ); + + /* + * Should reject attestations if the target is unknown. + */ + + let mut bad_target_attestation = valid_attestation.clone(); + bad_target_attestation.data.target.root = Hash256::from_low_u64_be(42); + + assert_eq!( + harness.chain.process_attestation(bad_target_attestation), + Ok(AttestationProcessingOutcome::UnknownTargetRoot( + Hash256::from_low_u64_be(42) + )), + "should not accept bad_target attestation" + ); + + /* + * Should reject attestations if the target is unknown. + */ + + let mut future_block_attestation = valid_attestation.clone(); + future_block_attestation.data.slot -= 1; + + assert_eq!( + harness.chain.process_attestation(future_block_attestation), + Ok(AttestationProcessingOutcome::AttestsToFutureBlock { + block: current_slot, + attestation: current_slot - 1 + }), + "should not accept future_block attestation" + ); + + /* + * Should reject attestations if the target is unknown. + */ + + let mut bad_head_attestation = valid_attestation.clone(); + bad_head_attestation.data.beacon_block_root = Hash256::from_low_u64_be(42); + + assert_eq!( + harness.chain.process_attestation(bad_head_attestation), + Ok(AttestationProcessingOutcome::UnknownHeadBlock { + beacon_block_root: Hash256::from_low_u64_be(42) + }), + "should not accept bad_head attestation" + ); + + /* + * Should reject attestations with a bad signature. + */ + + let mut bad_signature_attestation = valid_attestation.clone(); + let kp = generate_deterministic_keypair(0); + let mut agg_sig = AggregateSignature::new(); + agg_sig.add(&Signature::new(&[42, 42], &kp.sk)); + bad_signature_attestation.signature = agg_sig; + + assert_eq!( + harness.chain.process_attestation(bad_signature_attestation), + Ok(AttestationProcessingOutcome::InvalidSignature), + "should not accept bad_signature attestation" + ); + + /* + * Should reject attestations with an empty bitfield. + */ + + let mut empty_bitfield_attestation = valid_attestation.clone(); + empty_bitfield_attestation.aggregation_bits = + BitList::with_capacity(1).expect("should build bitfield"); + + assert_eq!( + harness + .chain + .process_attestation(empty_bitfield_attestation), + Ok(AttestationProcessingOutcome::EmptyAggregationBitfield), + "should not accept empty_bitfield attestation" + ); +} + +#[test] +fn attestation_that_skips_epochs() { + let harness = get_harness(VALIDATOR_COUNT); + let chain = &harness.chain; + + // Extend the chain out a few epochs so we have some chain depth to play with. + harness.extend_chain( + MainnetEthSpec::slots_per_epoch() as usize * 3 + 1, + BlockStrategy::OnCanonicalHead, + AttestationStrategy::AllValidators, + ); + + let current_slot = chain.slot().expect("should get slot"); + let current_epoch = chain.epoch().expect("should get epoch"); + + let earlier_slot = (current_epoch - 2).start_slot(MainnetEthSpec::slots_per_epoch()); + let earlier_block = chain + .block_at_slot(earlier_slot) + .expect("should not error getting block at slot") + .expect("should find block at slot"); + + let mut state = chain + .get_state(&earlier_block.state_root(), Some(earlier_slot)) + .expect("should not error getting state") + .expect("should find state"); + + while state.slot < current_slot { + per_slot_processing(&mut state, None, &harness.spec).expect("should process slot"); + } + + let attestation = harness + .get_free_attestations( + &AttestationStrategy::AllValidators, + &state, + earlier_block.canonical_root(), + current_slot, + ) + .first() + .cloned() + .expect("should get at least one attestation"); + + assert_eq!( + harness.chain.process_attestation(attestation), + Ok(AttestationProcessingOutcome::Processed), + "should process attestation that skips slots" + ); +} diff --git a/beacon_node/beacon_chain/tests/persistence_tests.rs b/beacon_node/beacon_chain/tests/persistence_tests.rs index 73d399b23c..385408079e 100644 --- a/beacon_node/beacon_chain/tests/persistence_tests.rs +++ b/beacon_node/beacon_chain/tests/persistence_tests.rs @@ -74,19 +74,17 @@ fn finalizes_after_resuming_from_db() { harness.chain.persist().expect("should persist the chain"); + let data_dir = harness.data_dir; + let original_chain = harness.chain; + let resumed_harness = BeaconChainHarness::resume_from_disk_store( MinimalEthSpec, store, KEYPAIRS[0..validator_count].to_vec(), + data_dir, ); - assert_chains_pretty_much_the_same(&harness.chain, &resumed_harness.chain); - - // Ensures we don't accidentally use it again. - // - // Note: this will persist the chain again, but that shouldn't matter since nothing has - // changed. - drop(harness); + assert_chains_pretty_much_the_same(&original_chain, &resumed_harness.chain); // Set the slot clock of the resumed harness to be in the slot following the previous harness. // diff --git a/beacon_node/beacon_chain/tests/store_tests.rs b/beacon_node/beacon_chain/tests/store_tests.rs index 551c079691..ea36a0125f 100644 --- a/beacon_node/beacon_chain/tests/store_tests.rs +++ b/beacon_node/beacon_chain/tests/store_tests.rs @@ -307,13 +307,19 @@ fn epoch_boundary_state_attestation_processing() { let res = harness .chain .process_attestation_internal(attestation.clone()); - if attestation.data.slot <= finalized_epoch.start_slot(E::slots_per_epoch()) { + + let current_epoch = harness.chain.epoch().expect("should get epoch"); + let attestation_epoch = attestation.data.target.epoch; + + if attestation.data.slot <= finalized_epoch.start_slot(E::slots_per_epoch()) + || attestation_epoch + 1 < current_epoch + { checked_pre_fin = true; assert_eq!( res, - Ok(AttestationProcessingOutcome::FinalizedSlot { - attestation: attestation.data.target.epoch, - finalized: finalized_epoch, + Ok(AttestationProcessingOutcome::PastEpoch { + attestation_epoch, + current_epoch, }) ); } else { diff --git a/beacon_node/beacon_chain/tests/tests.rs b/beacon_node/beacon_chain/tests/tests.rs index 99a5276e11..d0711d8972 100644 --- a/beacon_node/beacon_chain/tests/tests.rs +++ b/beacon_node/beacon_chain/tests/tests.rs @@ -442,11 +442,23 @@ fn attestations_with_increasing_slots() { harness.advance_slot(); } + let current_epoch = harness.chain.epoch().expect("should get epoch"); + for attestation in attestations { - assert_eq!( - harness.chain.process_attestation(attestation), - Ok(AttestationProcessingOutcome::Processed) - ) + let attestation_epoch = attestation.data.target.epoch; + let res = harness.chain.process_attestation(attestation); + + if attestation_epoch + 1 < current_epoch { + assert_eq!( + res, + Ok(AttestationProcessingOutcome::PastEpoch { + attestation_epoch, + current_epoch, + }) + ) + } else { + assert_eq!(res, Ok(AttestationProcessingOutcome::Processed)) + } } } diff --git a/beacon_node/client/src/builder.rs b/beacon_node/client/src/builder.rs index 5aff96f2bb..87a734f0da 100644 --- a/beacon_node/client/src/builder.rs +++ b/beacon_node/client/src/builder.rs @@ -117,13 +117,14 @@ where pub fn beacon_chain_builder( mut self, client_genesis: ClientGenesis, - config: Eth1Config, + config: ClientConfig, ) -> impl Future { let store = self.store.clone(); let store_migrator = self.store_migrator.take(); let chain_spec = self.chain_spec.clone(); let runtime_context = self.runtime_context.clone(); let eth_spec_instance = self.eth_spec_instance.clone(); + let data_dir = config.data_dir.clone(); future::ok(()) .and_then(move |()| { @@ -144,6 +145,7 @@ where .logger(context.log.clone()) .store(store) .store_migrator(store_migrator) + .data_dir(data_dir) .custom_spec(spec.clone()); Ok((builder, spec, context)) @@ -197,11 +199,11 @@ where info!( context.log, "Waiting for eth2 genesis from eth1"; - "eth1_node" => &config.endpoint + "eth1_node" => &config.eth1.endpoint ); let genesis_service = - Eth1GenesisService::new(config, context.log.clone()); + Eth1GenesisService::new(config.eth1, context.log.clone()); let future = genesis_service .wait_for_genesis_state( @@ -233,7 +235,7 @@ where } ClientGenesis::Resume => { let future = builder - .resume_from_db(config) + .resume_from_db(config.eth1) .into_future() .map(|v| (v, None)); diff --git a/beacon_node/client/src/config.rs b/beacon_node/client/src/config.rs index a855d78ab6..3335ea1237 100644 --- a/beacon_node/client/src/config.rs +++ b/beacon_node/client/src/config.rs @@ -3,6 +3,8 @@ use serde_derive::{Deserialize, Serialize}; use std::fs; use std::path::PathBuf; +pub const DEFAULT_DATADIR: &str = ".lighthouse"; + /// The number initial validators when starting the `Minimal`. const TESTNET_SPEC_CONSTANTS: &str = "minimal"; @@ -70,7 +72,7 @@ pub struct Config { impl Default for Config { fn default() -> Self { Self { - data_dir: PathBuf::from(".lighthouse"), + data_dir: PathBuf::from(DEFAULT_DATADIR), db_name: "chain_db".to_string(), freezer_db_path: None, testnet_dir: None, diff --git a/beacon_node/client/src/lib.rs b/beacon_node/client/src/lib.rs index 22d9c7c6ff..6f1214ce50 100644 --- a/beacon_node/client/src/lib.rs +++ b/beacon_node/client/src/lib.rs @@ -1,6 +1,6 @@ extern crate slog; -mod config; +pub mod config; mod notifier; pub mod builder; diff --git a/beacon_node/network/src/message_processor.rs b/beacon_node/network/src/message_processor.rs index e22ae567a9..99b1a9fce1 100644 --- a/beacon_node/network/src/message_processor.rs +++ b/beacon_node/network/src/message_processor.rs @@ -581,10 +581,16 @@ impl MessageProcessor { // we don't know the block, get the sync manager to handle the block lookup self.send_to_sync(SyncMessage::UnknownBlockHash(peer_id, beacon_block_root)); } - AttestationProcessingOutcome::AttestsToFutureState { .. } + AttestationProcessingOutcome::FutureEpoch { .. } + | AttestationProcessingOutcome::PastEpoch { .. } + | AttestationProcessingOutcome::UnknownTargetRoot { .. } | AttestationProcessingOutcome::FinalizedSlot { .. } => {} // ignore the attestation AttestationProcessingOutcome::Invalid { .. } - | AttestationProcessingOutcome::EmptyAggregationBitfield { .. } => { + | AttestationProcessingOutcome::EmptyAggregationBitfield { .. } + | AttestationProcessingOutcome::AttestsToFutureBlock { .. } + | AttestationProcessingOutcome::InvalidSignature + | AttestationProcessingOutcome::NoCommitteeForSlotAndIndex { .. } + | AttestationProcessingOutcome::BadTargetEpoch { .. } => { // the peer has sent a bad attestation. Remove them. self.network.disconnect(peer_id, GoodbyeReason::Fault); } diff --git a/beacon_node/network/src/persisted_dht.rs b/beacon_node/network/src/persisted_dht.rs index c43e5d84fe..bf9d70430f 100644 --- a/beacon_node/network/src/persisted_dht.rs +++ b/beacon_node/network/src/persisted_dht.rs @@ -1,15 +1,13 @@ -use beacon_chain::BeaconChainTypes; use eth2_libp2p::Enr; use rlp; use std::sync::Arc; -use store::Store; -use store::{DBColumn, Error as StoreError, SimpleStoreItem}; -use types::Hash256; +use store::{DBColumn, Error as StoreError, SimpleStoreItem, Store}; +use types::{EthSpec, Hash256}; /// 32-byte key for accessing the `DhtEnrs`. pub const DHT_DB_KEY: &str = "PERSISTEDDHTPERSISTEDDHTPERSISTE"; -pub fn load_dht(store: Arc) -> Vec { +pub fn load_dht, E: EthSpec>(store: Arc) -> Vec { // Load DHT from store let key = Hash256::from_slice(&DHT_DB_KEY.as_bytes()); match store.get(&key) { @@ -22,8 +20,8 @@ pub fn load_dht(store: Arc) -> Vec { } /// Attempt to persist the ENR's in the DHT to `self.store`. -pub fn persist_dht( - store: Arc, +pub fn persist_dht, E: EthSpec>( + store: Arc, enrs: Vec, ) -> Result<(), store::Error> { let key = Hash256::from_slice(&DHT_DB_KEY.as_bytes()); diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index bf3b4eaddb..0439e39c8f 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -56,7 +56,7 @@ impl Service { let (network_globals, mut libp2p_service) = LibP2PService::new(config, network_log.clone())?; - for enr in load_dht::(store.clone()) { + for enr in load_dht::(store.clone()) { libp2p_service.swarm.add_enr(enr); } @@ -154,7 +154,7 @@ fn spawn_service( "Number of peers" => format!("{}", enrs.len()), ); - match persist_dht::(store.clone(), enrs) { + match persist_dht::(store.clone(), enrs) { Err(e) => error!( log, "Failed to persist DHT on drop"; diff --git a/beacon_node/network/src/service/tests.rs b/beacon_node/network/src/service/tests.rs index 2d18e4a99e..c77cd064b9 100644 --- a/beacon_node/network/src/service/tests.rs +++ b/beacon_node/network/src/service/tests.rs @@ -3,19 +3,16 @@ mod tests { use crate::persisted_dht::load_dht; use crate::{NetworkConfig, Service}; - use beacon_chain::builder::BeaconChainBuilder; - use beacon_chain::slot_clock::TestingSlotClock; + use beacon_chain::test_utils::BeaconChainHarness; use eth2_libp2p::Enr; use futures::{Future, IntoFuture}; - use genesis::{generate_deterministic_keypairs, interop_genesis_state}; use slog::Logger; use sloggers::{null::NullLoggerBuilder, Build}; use std::str::FromStr; use std::sync::Arc; - use store::{migrate::NullMigrator, SimpleDiskStore}; - use tempdir::TempDir; + use store::MemoryStore; use tokio::runtime::Runtime; - use types::{EthSpec, MinimalEthSpec}; + use types::{test_utils::generate_deterministic_keypairs, MinimalEthSpec}; fn get_logger() -> Logger { let builder = NullLoggerBuilder; @@ -24,39 +21,14 @@ mod tests { #[test] fn test_dht_persistence() { - // Create new LevelDB store - let path = TempDir::new("persistence_test").unwrap(); - let store = Arc::new(SimpleDiskStore::open(&path.into_path()).unwrap()); - // Create a `BeaconChain` object to pass to `Service` - let validator_count = 1; - let genesis_time = 13371337; - let log = get_logger(); - let spec = MinimalEthSpec::default_spec(); - let genesis_state = interop_genesis_state( - &generate_deterministic_keypairs(validator_count), - genesis_time, - &spec, - ) - .expect("should create interop genesis state"); - let chain = BeaconChainBuilder::new(MinimalEthSpec) - .logger(log.clone()) - .store(store.clone()) - .store_migrator(NullMigrator) - .genesis_state(genesis_state) - .expect("should build state using recent genesis") - .dummy_eth1_backend() - .expect("should build the dummy eth1 backend") - .null_event_handler() - .testing_slot_clock(std::time::Duration::from_secs(1)) - .expect("should configure testing slot clock") - .reduced_tree_fork_choice() - .expect("should add fork choice to builder") - .build() - .expect("should build"); + let beacon_chain = Arc::new( + BeaconChainHarness::new(MinimalEthSpec, generate_deterministic_keypairs(8)).chain, + ); + + let store = beacon_chain.store.clone(); - let beacon_chain = Arc::new(chain); let enr1 = Enr::from_str("enr:-IS4QHCYrYZbAKWCBRlAy5zzaDZXJBGkcnh4MHcBFZntXNFrdvJjX04jRzjzCBOonrkTfj499SZuOh8R33Ls8RRcy5wBgmlkgnY0gmlwhH8AAAGJc2VjcDI1NmsxoQPKY0yuDUmstAHYpMa2_oxVtw0RW_QAdpzBQA8yWM0xOIN1ZHCCdl8").unwrap(); let enr2 = Enr::from_str("enr:-IS4QJ2d11eu6dC7E7LoXeLMgMP3kom1u3SE8esFSWvaHoo0dP1jg8O3-nx9ht-EO3CmG7L6OkHcMmoIh00IYWB92QABgmlkgnY0gmlwhH8AAAGJc2VjcDI1NmsxoQIB_c-jQMOXsbjWkbN-Oj99H57gfId5pfb4wa1qxwV4CIN1ZHCCIyk").unwrap(); let enrs = vec![enr1, enr2]; @@ -79,19 +51,7 @@ mod tests { .unwrap(); // Load the persisted dht from the store - let persisted_enrs = load_dht::< - beacon_chain::builder::Witness< - SimpleDiskStore, - store::migrate::NullMigrator, - TestingSlotClock, - beacon_chain::eth1_chain::CachingEth1Backend< - types::eth_spec::MinimalEthSpec, - SimpleDiskStore, - >, - types::eth_spec::MinimalEthSpec, - beacon_chain::events::NullEventHandler, - >, - >(store); + let persisted_enrs = load_dht::, MinimalEthSpec>(store); assert!( persisted_enrs.contains(&enrs[0]), "should have persisted the first ENR to store" diff --git a/beacon_node/src/config.rs b/beacon_node/src/config.rs index fe458b9836..bdf759281d 100644 --- a/beacon_node/src/config.rs +++ b/beacon_node/src/config.rs @@ -1,5 +1,5 @@ use clap::ArgMatches; -use client::{ClientConfig, ClientGenesis, Eth2Config}; +use client::{config::DEFAULT_DATADIR, ClientConfig, ClientGenesis, Eth2Config}; use eth2_config::{read_from_file, write_to_file}; use eth2_libp2p::{Enr, Multiaddr}; use eth2_testnet_config::Eth2TestnetConfig; @@ -47,7 +47,7 @@ pub fn get_configs( client_config.data_dir = cli_args .value_of("datadir") .map(|path| PathBuf::from(path).join(BEACON_NODE_DIR)) - .or_else(|| dirs::home_dir().map(|home| home.join(".lighthouse").join(BEACON_NODE_DIR))) + .or_else(|| dirs::home_dir().map(|home| home.join(DEFAULT_DATADIR).join(BEACON_NODE_DIR))) .unwrap_or_else(|| PathBuf::from(".")); // Load the client config, if it exists . diff --git a/beacon_node/src/lib.rs b/beacon_node/src/lib.rs index 36df8ce1ed..ca758c8c39 100644 --- a/beacon_node/src/lib.rs +++ b/beacon_node/src/lib.rs @@ -76,7 +76,7 @@ impl ProductionBeaconNode { ) -> impl Future { let http_eth2_config = context.eth2_config().clone(); let spec = context.eth2_config().spec.clone(); - let genesis_eth1_config = client_config.eth1.clone(); + let client_config_1 = client_config.clone(); let client_genesis = client_config.genesis.clone(); let store_config = client_config.store.clone(); let log = context.log.clone(); @@ -93,9 +93,7 @@ impl ProductionBeaconNode { .disk_store(&db_path, &freezer_db_path_res?, store_config)? .background_migrator()?) }) - .and_then(move |builder| { - builder.beacon_chain_builder(client_genesis, genesis_eth1_config) - }) + .and_then(move |builder| builder.beacon_chain_builder(client_genesis, client_config_1)) .and_then(move |builder| { let builder = if client_config.sync_eth1_chain && !client_config.dummy_eth1_backend { diff --git a/eth2/operation_pool/src/attestation.rs b/eth2/operation_pool/src/attestation.rs index 3b56061322..5f13d1ea81 100644 --- a/eth2/operation_pool/src/attestation.rs +++ b/eth2/operation_pool/src/attestation.rs @@ -18,7 +18,10 @@ impl<'a, T: EthSpec> AttMaxCover<'a, T> { spec: &ChainSpec, ) -> Option { let fresh_validators = earliest_attestation_validators(att, state); - let indices = get_attesting_indices(state, &att.data, &fresh_validators).ok()?; + let committee = state + .get_beacon_committee(att.data.slot, att.data.index) + .ok()?; + let indices = get_attesting_indices::(committee.committee, &fresh_validators).ok()?; let fresh_validators_rewards: HashMap = indices .iter() .map(|i| *i as u64) diff --git a/eth2/operation_pool/src/attestation_id.rs b/eth2/operation_pool/src/attestation_id.rs index 38eb8f9e2a..b70a0bfc89 100644 --- a/eth2/operation_pool/src/attestation_id.rs +++ b/eth2/operation_pool/src/attestation_id.rs @@ -2,7 +2,7 @@ use int_to_bytes::int_to_bytes8; use serde_derive::{Deserialize, Serialize}; use ssz::ssz_encode; use ssz_derive::{Decode, Encode}; -use types::{AttestationData, BeaconState, ChainSpec, Domain, Epoch, EthSpec}; +use types::{AttestationData, ChainSpec, Domain, Epoch, Fork}; /// Serialized `AttestationData` augmented with a domain to encode the fork info. #[derive( @@ -16,23 +16,15 @@ pub struct AttestationId { const DOMAIN_BYTES_LEN: usize = 8; impl AttestationId { - pub fn from_data( - attestation: &AttestationData, - state: &BeaconState, - spec: &ChainSpec, - ) -> Self { + pub fn from_data(attestation: &AttestationData, fork: &Fork, spec: &ChainSpec) -> Self { let mut bytes = ssz_encode(attestation); let epoch = attestation.target.epoch; - bytes.extend_from_slice(&AttestationId::compute_domain_bytes(epoch, state, spec)); + bytes.extend_from_slice(&AttestationId::compute_domain_bytes(epoch, fork, spec)); AttestationId { v: bytes } } - pub fn compute_domain_bytes( - epoch: Epoch, - state: &BeaconState, - spec: &ChainSpec, - ) -> Vec { - int_to_bytes8(spec.get_domain(epoch, Domain::BeaconAttester, &state.fork)) + pub fn compute_domain_bytes(epoch: Epoch, fork: &Fork, spec: &ChainSpec) -> Vec { + int_to_bytes8(spec.get_domain(epoch, Domain::BeaconAttester, fork)) } pub fn domain_bytes_match(&self, domain_bytes: &[u8]) -> bool { diff --git a/eth2/operation_pool/src/lib.rs b/eth2/operation_pool/src/lib.rs index 4786be3cb0..02f5457790 100644 --- a/eth2/operation_pool/src/lib.rs +++ b/eth2/operation_pool/src/lib.rs @@ -22,7 +22,7 @@ use std::collections::{hash_map, HashMap, HashSet}; use std::marker::PhantomData; use types::{ typenum::Unsigned, Attestation, AttesterSlashing, BeaconState, BeaconStateError, ChainSpec, - EthSpec, ProposerSlashing, RelativeEpoch, SignedVoluntaryExit, Validator, + EthSpec, Fork, ProposerSlashing, RelativeEpoch, SignedVoluntaryExit, Validator, }; #[derive(Default, Debug)] @@ -57,10 +57,10 @@ impl OperationPool { pub fn insert_attestation( &self, attestation: Attestation, - state: &BeaconState, + fork: &Fork, spec: &ChainSpec, ) -> Result<(), AttestationValidationError> { - let id = AttestationId::from_data(&attestation.data, state, spec); + let id = AttestationId::from_data(&attestation.data, fork, spec); // Take a write lock on the attestations map. let mut attestations = self.attestations.write(); @@ -106,8 +106,9 @@ impl OperationPool { // Attestations for the current fork, which may be from the current or previous epoch. let prev_epoch = state.previous_epoch(); let current_epoch = state.current_epoch(); - let prev_domain_bytes = AttestationId::compute_domain_bytes(prev_epoch, state, spec); - let curr_domain_bytes = AttestationId::compute_domain_bytes(current_epoch, state, spec); + let prev_domain_bytes = AttestationId::compute_domain_bytes(prev_epoch, &state.fork, spec); + let curr_domain_bytes = + AttestationId::compute_domain_bytes(current_epoch, &state.fork, spec); let reader = self.attestations.read(); let active_indices = state .get_cached_active_validator_indices(RelativeEpoch::Current) @@ -180,8 +181,8 @@ impl OperationPool { spec: &ChainSpec, ) -> (AttestationId, AttestationId) { ( - AttestationId::from_data(&slashing.attestation_1.data, state, spec), - AttestationId::from_data(&slashing.attestation_2.data, state, spec), + AttestationId::from_data(&slashing.attestation_1.data, &state.fork, spec), + AttestationId::from_data(&slashing.attestation_2.data, &state.fork, spec), ) } @@ -547,7 +548,7 @@ mod release_tests { spec, None, ); - op_pool.insert_attestation(att, state, spec).unwrap(); + op_pool.insert_attestation(att, &state.fork, spec).unwrap(); } } @@ -616,9 +617,9 @@ mod release_tests { None, ); op_pool - .insert_attestation(att.clone(), state, spec) + .insert_attestation(att.clone(), &state.fork, spec) .unwrap(); - op_pool.insert_attestation(att, state, spec).unwrap(); + op_pool.insert_attestation(att, &state.fork, spec).unwrap(); } assert_eq!(op_pool.num_attestations(), committees.len()); @@ -655,7 +656,7 @@ mod release_tests { spec, None, ); - op_pool.insert_attestation(att, state, spec).unwrap(); + op_pool.insert_attestation(att, &state.fork, spec).unwrap(); } } @@ -703,7 +704,7 @@ mod release_tests { spec, if i == 0 { None } else { Some(0) }, ); - op_pool.insert_attestation(att, state, spec).unwrap(); + op_pool.insert_attestation(att, &state.fork, spec).unwrap(); } }; @@ -776,7 +777,7 @@ mod release_tests { spec, if i == 0 { None } else { Some(0) }, ); - op_pool.insert_attestation(att, state, spec).unwrap(); + op_pool.insert_attestation(att, &state.fork, spec).unwrap(); } }; @@ -816,8 +817,14 @@ mod release_tests { for att in &best_attestations { let fresh_validators_bitlist = earliest_attestation_validators(att, state); - let att_indices = - get_attesting_indices(state, &att.data, &fresh_validators_bitlist).unwrap(); + let committee = state + .get_beacon_committee(att.data.slot, att.data.index) + .expect("should get beacon committee"); + let att_indices = get_attesting_indices::( + committee.committee, + &fresh_validators_bitlist, + ) + .unwrap(); let fresh_indices = &att_indices - &seen_indices; let rewards = fresh_indices diff --git a/eth2/proto_array_fork_choice/src/fork_choice_test_definition.rs b/eth2/proto_array_fork_choice/src/fork_choice_test_definition.rs index b016ed0421..c74e33c456 100644 --- a/eth2/proto_array_fork_choice/src/fork_choice_test_definition.rs +++ b/eth2/proto_array_fork_choice/src/fork_choice_test_definition.rs @@ -57,6 +57,7 @@ impl ForkChoiceTestDefinition { pub fn run(self) { let fork_choice = ProtoArrayForkChoice::new( self.finalized_block_slot, + Hash256::zero(), self.justified_epoch, self.finalized_epoch, self.finalized_root, @@ -120,7 +121,14 @@ impl ForkChoiceTestDefinition { finalized_epoch, } => { fork_choice - .process_block(slot, root, parent_root, justified_epoch, finalized_epoch) + .process_block( + slot, + root, + parent_root, + Hash256::zero(), + justified_epoch, + finalized_epoch, + ) .expect(&format!( "process_block op at index {} returned error", op_index diff --git a/eth2/proto_array_fork_choice/src/proto_array.rs b/eth2/proto_array_fork_choice/src/proto_array.rs index 85d47ede99..ece8648bff 100644 --- a/eth2/proto_array_fork_choice/src/proto_array.rs +++ b/eth2/proto_array_fork_choice/src/proto_array.rs @@ -9,6 +9,9 @@ pub struct ProtoNode { /// The `slot` is not necessary for `ProtoArray`, it just exists so external components can /// easily query the block slot. This is useful for upstream fork choice logic. pub slot: Slot, + /// The `state_root` is not necessary for `ProtoArray` either, it also just exists for upstream + /// components (namely attestation verification). + pub state_root: Hash256, root: Hash256, parent: Option, justified_epoch: Epoch, @@ -126,6 +129,7 @@ impl ProtoArray { slot: Slot, root: Hash256, parent_opt: Option, + state_root: Hash256, justified_epoch: Epoch, finalized_epoch: Epoch, ) -> Result<(), Error> { @@ -138,6 +142,7 @@ impl ProtoArray { let node = ProtoNode { slot, + state_root, root, parent: parent_opt.and_then(|parent| self.indices.get(&parent).copied()), justified_epoch, diff --git a/eth2/proto_array_fork_choice/src/proto_array_fork_choice.rs b/eth2/proto_array_fork_choice/src/proto_array_fork_choice.rs index 0112b690c6..8fc390003f 100644 --- a/eth2/proto_array_fork_choice/src/proto_array_fork_choice.rs +++ b/eth2/proto_array_fork_choice/src/proto_array_fork_choice.rs @@ -60,6 +60,7 @@ impl PartialEq for ProtoArrayForkChoice { impl ProtoArrayForkChoice { pub fn new( finalized_block_slot: Slot, + finalized_block_state_root: Hash256, justified_epoch: Epoch, finalized_epoch: Epoch, finalized_root: Hash256, @@ -77,6 +78,7 @@ impl ProtoArrayForkChoice { finalized_block_slot, finalized_root, None, + finalized_block_state_root, justified_epoch, finalized_epoch, ) @@ -111,6 +113,7 @@ impl ProtoArrayForkChoice { slot: Slot, block_root: Hash256, parent_root: Hash256, + state_root: Hash256, justified_epoch: Epoch, finalized_epoch: Epoch, ) -> Result<(), String> { @@ -120,6 +123,7 @@ impl ProtoArrayForkChoice { slot, block_root, Some(parent_root), + state_root, justified_epoch, finalized_epoch, ) @@ -186,6 +190,15 @@ impl ProtoArrayForkChoice { Some(block.slot) } + pub fn block_slot_and_state_root(&self, block_root: &Hash256) -> Option<(Slot, Hash256)> { + let proto_array = self.proto_array.read(); + + let i = proto_array.indices.get(block_root)?; + let block = proto_array.nodes.get(*i)?; + + Some((block.slot, block.state_root)) + } + pub fn latest_message(&self, validator_index: usize) -> Option<(Hash256, Epoch)> { let votes = self.votes.read(); diff --git a/eth2/state_processing/src/common/get_attesting_indices.rs b/eth2/state_processing/src/common/get_attesting_indices.rs index c573328658..1bfbcabd50 100644 --- a/eth2/state_processing/src/common/get_attesting_indices.rs +++ b/eth2/state_processing/src/common/get_attesting_indices.rs @@ -5,18 +5,14 @@ use types::*; /// /// Spec v0.10.1 pub fn get_attesting_indices( - state: &BeaconState, - attestation_data: &AttestationData, + committee: &[usize], bitlist: &BitList, ) -> Result, BeaconStateError> { - let committee = state.get_beacon_committee(attestation_data.slot, attestation_data.index)?; - - if bitlist.len() != committee.committee.len() { + if bitlist.len() != committee.len() { return Err(BeaconStateError::InvalidBitfield); } Ok(committee - .committee .iter() .enumerate() .filter_map(|(i, validator_index)| match bitlist.get(i) { diff --git a/eth2/state_processing/src/common/get_indexed_attestation.rs b/eth2/state_processing/src/common/get_indexed_attestation.rs index 6804db4068..263cdb0994 100644 --- a/eth2/state_processing/src/common/get_indexed_attestation.rs +++ b/eth2/state_processing/src/common/get_indexed_attestation.rs @@ -8,11 +8,10 @@ type Result = std::result::Result>; /// /// Spec v0.10.1 pub fn get_indexed_attestation( - state: &BeaconState, + committee: &[usize], attestation: &Attestation, ) -> Result> { - let attesting_indices = - get_attesting_indices(state, &attestation.data, &attestation.aggregation_bits)?; + let attesting_indices = get_attesting_indices::(committee, &attestation.aggregation_bits)?; Ok(IndexedAttestation { attesting_indices: VariableList::new( diff --git a/eth2/state_processing/src/lib.rs b/eth2/state_processing/src/lib.rs index 0c82527e01..e02f04ec1e 100644 --- a/eth2/state_processing/src/lib.rs +++ b/eth2/state_processing/src/lib.rs @@ -10,7 +10,8 @@ pub mod test_utils; pub use genesis::{initialize_beacon_state_from_eth1, is_valid_genesis_state, process_activations}; pub use per_block_processing::{ - errors::BlockProcessingError, per_block_processing, BlockSignatureStrategy, VerifySignatures, + errors::BlockProcessingError, per_block_processing, signature_sets, BlockSignatureStrategy, + VerifySignatures, }; pub use per_epoch_processing::{errors::EpochProcessingError, per_epoch_processing}; pub use per_slot_processing::{per_slot_processing, Error as SlotProcessingError}; diff --git a/eth2/state_processing/src/per_block_processing.rs b/eth2/state_processing/src/per_block_processing.rs index 844ae850f1..7285f2dacd 100644 --- a/eth2/state_processing/src/per_block_processing.rs +++ b/eth2/state_processing/src/per_block_processing.rs @@ -24,7 +24,7 @@ pub mod block_processing_builder; mod block_signature_verifier; pub mod errors; mod is_valid_indexed_attestation; -mod signature_sets; +pub mod signature_sets; pub mod tests; mod verify_attestation; mod verify_attester_slashing; diff --git a/eth2/state_processing/src/per_block_processing/block_signature_verifier.rs b/eth2/state_processing/src/per_block_processing/block_signature_verifier.rs index 850b439ddb..51be38ff96 100644 --- a/eth2/state_processing/src/per_block_processing/block_signature_verifier.rs +++ b/eth2/state_processing/src/per_block_processing/block_signature_verifier.rs @@ -189,7 +189,11 @@ impl<'a, T: EthSpec> BlockSignatureVerifier<'a, T> { .attestations .iter() .map(|attestation| { - let indexed_attestation = get_indexed_attestation(self.state, attestation)?; + let committee = self + .state + .get_beacon_committee(attestation.data.slot, attestation.data.index)?; + let indexed_attestation = + get_indexed_attestation(committee.committee, attestation)?; self.sets.push(indexed_attestation_signature_set( &self.state, diff --git a/eth2/state_processing/src/per_block_processing/signature_sets.rs b/eth2/state_processing/src/per_block_processing/signature_sets.rs index 3c59e402be..a91749af9e 100644 --- a/eth2/state_processing/src/per_block_processing/signature_sets.rs +++ b/eth2/state_processing/src/per_block_processing/signature_sets.rs @@ -8,7 +8,7 @@ use std::convert::TryInto; use tree_hash::TreeHash; use types::{ AggregateSignature, AttesterSlashing, BeaconBlock, BeaconState, BeaconStateError, ChainSpec, - DepositData, Domain, EthSpec, Hash256, IndexedAttestation, ProposerSlashing, PublicKey, + DepositData, Domain, EthSpec, Fork, Hash256, IndexedAttestation, ProposerSlashing, PublicKey, Signature, SignedBeaconBlock, SignedBeaconBlockHeader, SignedRoot, SignedVoluntaryExit, SigningRoot, }; @@ -170,6 +170,32 @@ pub fn indexed_attestation_signature_set<'a, 'b, T: EthSpec>( Ok(SignatureSet::new(signature, vec![signed_message])) } +/// Returns the signature set for the given `indexed_attestation` but pubkeys are supplied directly +/// instead of from the state. +pub fn indexed_attestation_signature_set_from_pubkeys<'a, 'b, T: EthSpec>( + pubkeys: Vec<&'a PublicKey>, + signature: &'a AggregateSignature, + indexed_attestation: &'b IndexedAttestation, + fork: &Fork, + spec: &'a ChainSpec, +) -> Result> { + let pubkeys = pubkeys + .into_iter() + .map(|pubkey| Cow::Borrowed(&pubkey.as_raw().point)) + .collect(); + + let domain = spec.get_domain( + indexed_attestation.data.target.epoch, + Domain::BeaconAttester, + &fork, + ); + + let message = indexed_attestation.data.signing_root(domain); + let signed_message = SignedMessage::new(pubkeys, message.as_bytes().to_vec()); + + Ok(SignatureSet::new(signature, vec![signed_message])) +} + /// Returns the signature set for the given `attester_slashing` and corresponding `pubkeys`. pub fn attester_slashing_signature_sets<'a, T: EthSpec>( state: &'a BeaconState, diff --git a/eth2/state_processing/src/per_block_processing/verify_attestation.rs b/eth2/state_processing/src/per_block_processing/verify_attestation.rs index 91dba2d7c3..a340c3109e 100644 --- a/eth2/state_processing/src/per_block_processing/verify_attestation.rs +++ b/eth2/state_processing/src/per_block_processing/verify_attestation.rs @@ -67,7 +67,8 @@ pub fn verify_attestation_for_state( verify_casper_ffg_vote(attestation, state)?; // Check signature and bitfields - let indexed_attestation = get_indexed_attestation(state, attestation)?; + let committee = state.get_beacon_committee(attestation.data.slot, attestation.data.index)?; + let indexed_attestation = get_indexed_attestation(committee.committee, attestation)?; is_valid_indexed_attestation(state, &indexed_attestation, verify_signatures, spec)?; Ok(()) diff --git a/eth2/state_processing/src/per_epoch_processing/validator_statuses.rs b/eth2/state_processing/src/per_epoch_processing/validator_statuses.rs index cbcf42fe9f..28521bf894 100644 --- a/eth2/state_processing/src/per_epoch_processing/validator_statuses.rs +++ b/eth2/state_processing/src/per_epoch_processing/validator_statuses.rs @@ -195,7 +195,9 @@ impl ValidatorStatuses { .iter() .chain(state.current_epoch_attestations.iter()) { - let attesting_indices = get_attesting_indices(state, &a.data, &a.aggregation_bits)?; + let committee = state.get_beacon_committee(a.data.slot, a.data.index)?; + let attesting_indices = + get_attesting_indices::(committee.committee, &a.aggregation_bits)?; let mut status = ValidatorStatus::default();