From 6f919e6f7dbb6879afcd2af51409184887748d49 Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Thu, 14 Mar 2019 11:53:50 +1100 Subject: [PATCH] Add first iteration on faster rewards processing. --- .../benches/bench_epoch_processing.rs | 19 +- eth2/state_processing/benches/benches.rs | 2 +- .../src/per_epoch_processing.rs | 151 ++++++-------- .../src/per_epoch_processing/attesters.rs | 195 ++++++++++++++++++ 4 files changed, 265 insertions(+), 102 deletions(-) create mode 100644 eth2/state_processing/src/per_epoch_processing/attesters.rs diff --git a/eth2/state_processing/benches/bench_epoch_processing.rs b/eth2/state_processing/benches/bench_epoch_processing.rs index e4981b2008..93c6c7ebd7 100644 --- a/eth2/state_processing/benches/bench_epoch_processing.rs +++ b/eth2/state_processing/benches/bench_epoch_processing.rs @@ -150,13 +150,15 @@ fn bench_epoch_processing(c: &mut Criterion, state: &BeaconState, spec: &ChainSp let state_clone = state.clone(); let spec_clone = spec.clone(); + let active_validator_indices = calculate_active_validator_indices(&state, &spec); c.bench( &format!("{}/epoch_processing", desc), Benchmark::new("calculate_attester_sets", move |b| { b.iter_batched( || state_clone.clone(), |mut state| { - calculate_attester_sets(&mut state, &spec_clone).unwrap(); + calculate_attester_sets(&mut state, &active_validator_indices, &spec_clone) + .unwrap(); state }, criterion::BatchSize::SmallInput, @@ -168,8 +170,8 @@ fn bench_epoch_processing(c: &mut Criterion, state: &BeaconState, spec: &ChainSp let state_clone = state.clone(); let spec_clone = spec.clone(); let previous_epoch = state.previous_epoch(&spec); - let attesters = calculate_attester_sets(&state, &spec).unwrap(); let active_validator_indices = calculate_active_validator_indices(&state, &spec); + let attesters = calculate_attester_sets(&state, &active_validator_indices, &spec).unwrap(); let current_total_balance = state.get_total_balance(&active_validator_indices[..], &spec); let previous_total_balance = state.get_total_balance( &get_active_validator_indices(&state.validator_registry, previous_epoch)[..], @@ -185,8 +187,8 @@ fn bench_epoch_processing(c: &mut Criterion, state: &BeaconState, spec: &ChainSp &mut state, current_total_balance, previous_total_balance, - attesters.previous_epoch_boundary.balance, - attesters.current_epoch_boundary.balance, + attesters.balances.previous_epoch_boundary, + attesters.balances.current_epoch_boundary, &spec_clone, ); state @@ -214,8 +216,8 @@ fn bench_epoch_processing(c: &mut Criterion, state: &BeaconState, spec: &ChainSp let mut state_clone = state.clone(); let spec_clone = spec.clone(); let previous_epoch = state.previous_epoch(&spec); - let attesters = calculate_attester_sets(&state, &spec).unwrap(); let active_validator_indices = calculate_active_validator_indices(&state, &spec); + let attesters = calculate_attester_sets(&state, &active_validator_indices, &spec).unwrap(); let previous_total_balance = state.get_total_balance( &get_active_validator_indices(&state.validator_registry, previous_epoch)[..], &spec, @@ -229,7 +231,6 @@ fn bench_epoch_processing(c: &mut Criterion, state: &BeaconState, spec: &ChainSp |mut state| { process_rewards_and_penalities( &mut state, - &active_validator_indices, &attesters, previous_total_balance, &winning_root_for_shards, @@ -264,8 +265,8 @@ fn bench_epoch_processing(c: &mut Criterion, state: &BeaconState, spec: &ChainSp let mut state_clone = state.clone(); let spec_clone = spec.clone(); let previous_epoch = state.previous_epoch(&spec); - let attesters = calculate_attester_sets(&state, &spec).unwrap(); let active_validator_indices = calculate_active_validator_indices(&state, &spec); + let attesters = calculate_attester_sets(&state, &active_validator_indices, &spec).unwrap(); let current_total_balance = state.get_total_balance(&active_validator_indices[..], spec); let previous_total_balance = state.get_total_balance( &get_active_validator_indices(&state.validator_registry, previous_epoch)[..], @@ -279,8 +280,8 @@ fn bench_epoch_processing(c: &mut Criterion, state: &BeaconState, spec: &ChainSp &mut state_clone, current_total_balance, previous_total_balance, - attesters.previous_epoch_boundary.balance, - attesters.current_epoch_boundary.balance, + attesters.balances.previous_epoch_boundary, + attesters.balances.current_epoch_boundary, spec, ); assert!( diff --git a/eth2/state_processing/benches/benches.rs b/eth2/state_processing/benches/benches.rs index ad8c4f714f..c619e1ef75 100644 --- a/eth2/state_processing/benches/benches.rs +++ b/eth2/state_processing/benches/benches.rs @@ -18,8 +18,8 @@ pub fn state_processing(c: &mut Criterion) { Builder::from_env(Env::default().default_filter_or(LOG_LEVEL)).init(); } - bench_block_processing::bench_block_processing_n_validators(c, VALIDATOR_COUNT); bench_epoch_processing::bench_epoch_processing_n_validators(c, VALIDATOR_COUNT); + bench_block_processing::bench_block_processing_n_validators(c, VALIDATOR_COUNT); } criterion_group!(benches, state_processing); diff --git a/eth2/state_processing/src/per_epoch_processing.rs b/eth2/state_processing/src/per_epoch_processing.rs index 4abbe012cd..4fe53dd6b3 100644 --- a/eth2/state_processing/src/per_epoch_processing.rs +++ b/eth2/state_processing/src/per_epoch_processing.rs @@ -1,6 +1,5 @@ -use attester_sets::AttesterSets; +use attesters::Attesters; use errors::EpochProcessingError as Error; -use fnv::FnvHashMap; use fnv::FnvHashSet; use integer_sqrt::IntegerSquareRoot; use rayon::prelude::*; @@ -11,6 +10,7 @@ use types::{validator_registry::get_active_validator_indices, *}; use winning_root::{winning_root, WinningRoot}; pub mod attester_sets; +pub mod attesters; pub mod errors; pub mod inclusion_distance; pub mod tests; @@ -35,8 +35,6 @@ pub fn per_epoch_processing(state: &mut BeaconState, spec: &ChainSpec) -> Result state.build_epoch_cache(RelativeEpoch::Current, spec)?; state.build_epoch_cache(RelativeEpoch::Next, spec)?; - let attesters = calculate_attester_sets(&state, spec)?; - let active_validator_indices = calculate_active_validator_indices(&state, spec); let current_total_balance = state.get_total_balance(&active_validator_indices[..], spec); @@ -46,14 +44,16 @@ pub fn per_epoch_processing(state: &mut BeaconState, spec: &ChainSpec) -> Result spec, ); + let attesters = calculate_attester_sets(&state, &active_validator_indices, spec)?; + process_eth1_data(state, spec); process_justification( state, current_total_balance, previous_total_balance, - attesters.previous_epoch_boundary.balance, - attesters.current_epoch_boundary.balance, + attesters.balances.previous_epoch_boundary, + attesters.balances.current_epoch_boundary, spec, ); @@ -63,7 +63,6 @@ pub fn per_epoch_processing(state: &mut BeaconState, spec: &ChainSpec) -> Result // Rewards and Penalities process_rewards_and_penalities( state, - &active_validator_indices, &attesters, previous_total_balance, &winning_root_for_shards, @@ -107,9 +106,13 @@ pub fn calculate_active_validator_indices(state: &BeaconState, spec: &ChainSpec) /// Spec v0.4.0 pub fn calculate_attester_sets( state: &BeaconState, + active_validator_indices: &[usize], spec: &ChainSpec, -) -> Result { - AttesterSets::new(&state, spec) +) -> Result { + let mut attesters = Attesters::empty(state.validator_registry.len()); + attesters.process_active_validator_indices(&active_validator_indices); + attesters.process_attestations(&state, &state.latest_attestations, spec)?; + Ok(attesters) } /// Spec v0.4.0 @@ -283,22 +286,20 @@ pub fn process_crosslinks( /// Spec v0.4.0 pub fn process_rewards_and_penalities( state: &mut BeaconState, - active_validator_indices: &[usize], - attesters: &AttesterSets, + attesters: &Attesters, previous_total_balance: u64, winning_root_for_shards: &WinningRootHashSet, spec: &ChainSpec, ) -> Result<(), Error> { let next_epoch = state.next_epoch(spec); - let active_validator_indices: FnvHashSet = - FnvHashSet::from_iter(active_validator_indices.iter().cloned()); - + /* let previous_epoch_attestations: Vec<&PendingAttestation> = state .latest_attestations .par_iter() .filter(|a| a.data.slot.epoch(spec.slots_per_epoch) == state.previous_epoch(spec)) .collect(); + */ let base_reward_quotient = previous_total_balance.integer_sqrt() / spec.base_reward_quotient; @@ -309,6 +310,7 @@ pub fn process_rewards_and_penalities( return Err(Error::PreviousTotalBalanceIsZero); } + /* // Map is ValidatorIndex -> ProposerIndex let mut inclusion_slots: FnvHashMap = FnvHashMap::default(); for a in &previous_epoch_attestations { @@ -330,79 +332,55 @@ pub fn process_rewards_and_penalities( ); } } + */ // Justification and finalization let epochs_since_finality = next_epoch - state.finalized_epoch; - if epochs_since_finality <= 4 { - state.validator_balances = state - .validator_balances - .par_iter() - .enumerate() - .map(|(index, &balance)| { - let mut balance = balance; + state.validator_balances = state + .validator_balances + .par_iter() + .enumerate() + .map(|(index, &balance)| { + let mut balance = balance; + let status = &attesters.statuses[index]; + + if epochs_since_finality <= 4 { let base_reward = state.base_reward(index, base_reward_quotient, spec); // Expected FFG source - if attesters.previous_epoch.indices.contains(&index) { + if status.is_previous_epoch { safe_add_assign!( balance, - base_reward * attesters.previous_epoch.balance / previous_total_balance + base_reward * attesters.balances.previous_epoch / previous_total_balance ); - } else if active_validator_indices.contains(&index) { + } else if status.is_active { safe_sub_assign!(balance, base_reward); } // Expected FFG target - if attesters.previous_epoch_boundary.indices.contains(&index) { + if status.is_previous_epoch_boundary { safe_add_assign!( balance, - base_reward * attesters.previous_epoch_boundary.balance + base_reward * attesters.balances.previous_epoch_boundary / previous_total_balance ); - } else if active_validator_indices.contains(&index) { + } else if status.is_active { safe_sub_assign!(balance, base_reward); } // Expected beacon chain head - if attesters.previous_epoch_head.indices.contains(&index) { + if status.is_previous_epoch_head { safe_add_assign!( balance, - base_reward * attesters.previous_epoch_head.balance + base_reward * attesters.balances.previous_epoch_head / previous_total_balance ); - } else if active_validator_indices.contains(&index) { + } else if status.is_active { safe_sub_assign!(balance, base_reward); }; - - if attesters.previous_epoch.indices.contains(&index) { - let base_reward = state.base_reward(index, base_reward_quotient, spec); - - let (inclusion_distance, _) = inclusion_slots - .get(&index) - .expect("Inconsistent inclusion_slots."); - - if *inclusion_distance > 0 { - safe_add_assign!( - balance, - base_reward * spec.min_attestation_inclusion_delay - / inclusion_distance.as_u64() - ) - } - } - - balance - }) - .collect(); - } else { - state.validator_balances = state - .validator_balances - .par_iter() - .enumerate() - .map(|(index, &balance)| { - let mut balance = balance; - + } else { let inactivity_penalty = state.inactivity_penalty( index, epochs_since_finality, @@ -410,14 +388,14 @@ pub fn process_rewards_and_penalities( spec, ); - if active_validator_indices.contains(&index) { - if !attesters.previous_epoch.indices.contains(&index) { + if status.is_active { + if !status.is_previous_epoch { safe_sub_assign!(balance, inactivity_penalty); } - if !attesters.previous_epoch_boundary.indices.contains(&index) { + if !status.is_previous_epoch_boundary { safe_sub_assign!(balance, inactivity_penalty); } - if !attesters.previous_epoch_head.indices.contains(&index) { + if !status.is_previous_epoch_head { safe_sub_assign!(balance, inactivity_penalty); } @@ -426,42 +404,31 @@ pub fn process_rewards_and_penalities( safe_sub_assign!(balance, 2 * inactivity_penalty + base_reward); } } + } - if attesters.previous_epoch.indices.contains(&index) { - let base_reward = state.base_reward(index, base_reward_quotient, spec); - - let (inclusion_distance, _) = inclusion_slots - .get(&index) - .expect("Inconsistent inclusion_slots."); - - if *inclusion_distance > 0 { - safe_add_assign!( - balance, - base_reward * spec.min_attestation_inclusion_delay - / inclusion_distance.as_u64() - ) - } - } - - balance - }) - .collect(); - } + balance + }) + .collect(); // Attestation inclusion - // - for &index in &attesters.previous_epoch.indices { - let (_, proposer_index) = inclusion_slots - .get(&index) - .ok_or_else(|| Error::InclusionSlotsInconsistent(index))?; + for (index, _validator) in state.validator_registry.iter().enumerate() { + let status = &attesters.statuses[index]; - let base_reward = state.base_reward(*proposer_index, base_reward_quotient, spec); + if status.is_previous_epoch { + let proposer_index = status.inclusion_info.proposer_index; + let inclusion_distance = status.inclusion_info.distance; - safe_add_assign!( - state.validator_balances[*proposer_index], - base_reward / spec.attestation_inclusion_reward_quotient - ); + let base_reward = state.base_reward(proposer_index, base_reward_quotient, spec); + + if inclusion_distance > 0 && inclusion_distance < Slot::max_value() { + safe_add_assign!( + state.validator_balances[proposer_index], + base_reward * spec.min_attestation_inclusion_delay + / inclusion_distance.as_u64() + ) + } + } } //Crosslinks diff --git a/eth2/state_processing/src/per_epoch_processing/attesters.rs b/eth2/state_processing/src/per_epoch_processing/attesters.rs new file mode 100644 index 0000000000..662ddceed4 --- /dev/null +++ b/eth2/state_processing/src/per_epoch_processing/attesters.rs @@ -0,0 +1,195 @@ +use types::*; + +macro_rules! set_self_if_other_is_true { + ($self_: ident, $other: ident, $var: ident) => { + $self_.$var = $other.$var & !$self_.$var; + }; +} + +#[derive(Clone)] +pub struct InclusionInfo { + pub slot: Slot, + pub distance: Slot, + pub proposer_index: usize, +} + +impl Default for InclusionInfo { + fn default() -> Self { + Self { + slot: Slot::max_value(), + distance: Slot::max_value(), + proposer_index: 0, + } + } +} + +impl InclusionInfo { + pub fn update(&mut self, other: &Self) { + if other.slot < self.slot { + self.slot = other.slot; + self.distance = other.distance; + self.proposer_index = other.proposer_index; + } + } +} + +#[derive(Default, Clone)] +pub struct AttesterStatus { + pub is_active: bool, + + pub is_current_epoch: bool, + pub is_current_epoch_boundary: bool, + pub is_previous_epoch: bool, + pub is_previous_epoch_boundary: bool, + pub is_previous_epoch_head: bool, + + pub inclusion_info: InclusionInfo, +} + +impl AttesterStatus { + pub fn update(&mut self, other: &Self) { + // Update all the bool fields, only updating `self` if `other` is true (never setting + // `self` to false). + set_self_if_other_is_true!(self, other, is_active); + set_self_if_other_is_true!(self, other, is_current_epoch); + set_self_if_other_is_true!(self, other, is_current_epoch_boundary); + set_self_if_other_is_true!(self, other, is_previous_epoch); + set_self_if_other_is_true!(self, other, is_previous_epoch_boundary); + set_self_if_other_is_true!(self, other, is_previous_epoch_head); + + self.inclusion_info.update(&other.inclusion_info); + } +} + +#[derive(Default, Clone)] +pub struct TotalBalances { + pub current_epoch: u64, + pub current_epoch_boundary: u64, + pub previous_epoch: u64, + pub previous_epoch_boundary: u64, + pub previous_epoch_head: u64, +} + +pub struct Attesters { + pub statuses: Vec, + pub balances: TotalBalances, +} + +impl Attesters { + pub fn empty(num_validators: usize) -> Self { + Self { + statuses: vec![AttesterStatus::default(); num_validators], + balances: TotalBalances::default(), + } + } + + pub fn process_active_validator_indices(&mut self, active_validator_indices: &[usize]) { + let status = AttesterStatus { + is_active: true, + ..AttesterStatus::default() + }; + + for &i in active_validator_indices { + self.statuses[i].update(&status); + } + } + + pub fn process_attestations( + &mut self, + state: &BeaconState, + attestations: &[PendingAttestation], + spec: &ChainSpec, + ) -> Result<(), BeaconStateError> { + for a in attestations { + let attesting_indices = + state.get_attestation_participants(&a.data, &a.aggregation_bitfield, spec)?; + let attesting_balance = state.get_total_balance(&attesting_indices, spec); + + let mut status = AttesterStatus::default(); + + // Profile this attestation, updating the total balances and generating an + // `AttesterStatus` object that applies to all participants in the attestation. + if is_from_epoch(a, state.current_epoch(spec), spec) { + self.balances.current_epoch += attesting_balance; + status.is_current_epoch = true; + + if has_common_epoch_boundary_root(a, state, state.current_epoch(spec), spec)? { + self.balances.current_epoch_boundary += attesting_balance; + status.is_current_epoch_boundary = true; + } + } else if is_from_epoch(a, state.previous_epoch(spec), spec) { + self.balances.previous_epoch += attesting_balance; + status.is_previous_epoch = true; + + // The inclusion slot and distance are only required for previous epoch attesters. + status.inclusion_info = InclusionInfo { + slot: a.inclusion_slot, + distance: inclusion_distance(a), + proposer_index: state.get_beacon_proposer_index(a.inclusion_slot, spec)?, + }; + + if has_common_epoch_boundary_root(a, state, state.previous_epoch(spec), spec)? { + self.balances.previous_epoch_boundary += attesting_balance; + status.is_previous_epoch_boundary = true; + } + + if has_common_beacon_block_root(a, state, spec)? { + self.balances.previous_epoch_head += attesting_balance; + status.is_previous_epoch_head = true; + } + } + + // Loop through the participating validator indices and update the status vec. + for validator_index in attesting_indices { + self.statuses[validator_index].update(&status); + } + } + + Ok(()) + } +} + +fn inclusion_distance(a: &PendingAttestation) -> Slot { + a.inclusion_slot - a.data.slot +} + +/// Returns `true` if some `PendingAttestation` is from the supplied `epoch`. +/// +/// Spec v0.4.0 +fn is_from_epoch(a: &PendingAttestation, epoch: Epoch, spec: &ChainSpec) -> bool { + a.data.slot.epoch(spec.slots_per_epoch) == epoch +} + +/// Returns `true` if a `PendingAttestation` and `BeaconState` share the same beacon block hash for +/// the first slot of the given epoch. +/// +/// Spec v0.4.0 +fn has_common_epoch_boundary_root( + a: &PendingAttestation, + state: &BeaconState, + epoch: Epoch, + spec: &ChainSpec, +) -> Result { + let slot = epoch.start_slot(spec.slots_per_epoch); + let state_boundary_root = *state + .get_block_root(slot, spec) + .ok_or_else(|| BeaconStateError::InsufficientBlockRoots)?; + + Ok(a.data.epoch_boundary_root == state_boundary_root) +} + +/// Returns `true` if a `PendingAttestation` and `BeaconState` share the same beacon block hash for +/// the current slot of the `PendingAttestation`. +/// +/// Spec v0.4.0 +fn has_common_beacon_block_root( + a: &PendingAttestation, + state: &BeaconState, + spec: &ChainSpec, +) -> Result { + let state_block_root = *state + .get_block_root(a.data.slot, spec) + .ok_or_else(|| BeaconStateError::InsufficientBlockRoots)?; + + Ok(a.data.beacon_block_root == state_block_root) +}