From 3b901dc5ecdafc785dcc92712dd76ee60626bda4 Mon Sep 17 00:00:00 2001 From: Michael Sproul Date: Tue, 13 Apr 2021 05:27:42 +0000 Subject: [PATCH] Pack attestations into blocks in parallel (#2307) ## Proposed Changes Use two instances of max cover when packing attestations into blocks: one for the previous epoch, and one for the current epoch. This reduces the amount of computation done by roughly half due to the `O(n^2)` running time of max cover (`2 * (n/2)^2 = n^2/2`). This should help alleviate some load on block proposal, particularly on Prater. --- Cargo.lock | 4 + beacon_node/beacon_chain/src/beacon_chain.rs | 52 +++++-- beacon_node/operation_pool/Cargo.toml | 4 + beacon_node/operation_pool/src/attestation.rs | 7 +- .../operation_pool/src/attester_slashing.rs | 5 +- beacon_node/operation_pool/src/lib.rs | 143 ++++++++++++------ beacon_node/operation_pool/src/max_cover.rs | 40 +++-- beacon_node/operation_pool/src/metrics.rs | 14 ++ 8 files changed, 191 insertions(+), 78 deletions(-) create mode 100644 beacon_node/operation_pool/src/metrics.rs diff --git a/Cargo.lock b/Cargo.lock index fe4857ea60..16364ec8c7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4400,8 +4400,12 @@ dependencies = [ "eth2_ssz", "eth2_ssz_derive", "int_to_bytes", + "itertools 0.10.0", + "lazy_static", + "lighthouse_metrics", "parking_lot", "rand 0.7.3", + "rayon", "serde", "serde_derive", "state_processing", diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index a8ec8938e3..e0e45e214f 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -1160,6 +1160,26 @@ impl BeaconChain { Ok(signed_aggregate) } + /// Filter an attestation from the op pool for shuffling compatibility. + /// + /// Use the provided `filter_cache` map to memoize results. + pub fn filter_op_pool_attestation( + &self, + filter_cache: &mut HashMap<(Hash256, Epoch), bool>, + att: &Attestation, + state: &BeaconState, + ) -> bool { + *filter_cache + .entry((att.data.beacon_block_root, att.data.target.epoch)) + .or_insert_with(|| { + self.shuffling_is_compatible( + &att.data.beacon_block_root, + att.data.target.epoch, + &state, + ) + }) + } + /// Check that the shuffling at `block_root` is equal to one of the shufflings of `state`. /// /// The `target_epoch` argument determines which shuffling to check compatibility with, it @@ -1968,21 +1988,6 @@ impl BeaconChain { .deposits_for_block_inclusion(&state, ð1_data, &self.spec)? .into(); - // Map from attestation head block root to shuffling compatibility. - // Used to memoize the `attestation_shuffling_is_compatible` function. - let mut shuffling_filter_cache = HashMap::new(); - let attestation_filter = |att: &&Attestation| -> bool { - *shuffling_filter_cache - .entry((att.data.beacon_block_root, att.data.target.epoch)) - .or_insert_with(|| { - self.shuffling_is_compatible( - &att.data.beacon_block_root, - att.data.target.epoch, - &state, - ) - }) - }; - // Iterate through the naive aggregation pool and ensure all the attestations from there // are included in the operation pool. let unagg_import_timer = @@ -2012,9 +2017,24 @@ impl BeaconChain { let attestation_packing_timer = metrics::start_timer(&metrics::BLOCK_PRODUCTION_ATTESTATION_TIMES); + + let mut prev_filter_cache = HashMap::new(); + let prev_attestation_filter = |att: &&Attestation| { + self.filter_op_pool_attestation(&mut prev_filter_cache, *att, &state) + }; + let mut curr_filter_cache = HashMap::new(); + let curr_attestation_filter = |att: &&Attestation| { + self.filter_op_pool_attestation(&mut curr_filter_cache, *att, &state) + }; + let attestations = self .op_pool - .get_attestations(&state, attestation_filter, &self.spec) + .get_attestations( + &state, + prev_attestation_filter, + curr_attestation_filter, + &self.spec, + ) .map_err(BlockProductionError::OpPoolError)? .into(); drop(attestation_packing_timer); diff --git a/beacon_node/operation_pool/Cargo.toml b/beacon_node/operation_pool/Cargo.toml index c16ab8fb5d..6b20a806dc 100644 --- a/beacon_node/operation_pool/Cargo.toml +++ b/beacon_node/operation_pool/Cargo.toml @@ -5,12 +5,16 @@ authors = ["Michael Sproul "] edition = "2018" [dependencies] +itertools = "0.10.0" int_to_bytes = { path = "../../consensus/int_to_bytes" } +lazy_static = "1.4.0" +lighthouse_metrics = { path = "../../common/lighthouse_metrics" } parking_lot = "0.11.0" types = { path = "../../consensus/types" } state_processing = { path = "../../consensus/state_processing" } eth2_ssz = "0.1.2" eth2_ssz_derive = "0.1.0" +rayon = "1.5.0" serde = "1.0.116" serde_derive = "1.0.116" store = { path = "../store" } diff --git a/beacon_node/operation_pool/src/attestation.rs b/beacon_node/operation_pool/src/attestation.rs index 5f13d1ea81..ea12abaf4a 100644 --- a/beacon_node/operation_pool/src/attestation.rs +++ b/beacon_node/operation_pool/src/attestation.rs @@ -3,6 +3,7 @@ use state_processing::common::{get_attesting_indices, get_base_reward}; use std::collections::HashMap; use types::{Attestation, BeaconState, BitList, ChainSpec, EthSpec}; +#[derive(Debug, Clone)] pub struct AttMaxCover<'a, T: EthSpec> { /// Underlying attestation. att: &'a Attestation, @@ -44,8 +45,8 @@ impl<'a, T: EthSpec> MaxCover for AttMaxCover<'a, T> { type Object = Attestation; type Set = HashMap; - fn object(&self) -> Attestation { - self.att.clone() + fn object(&self) -> &Attestation { + self.att } fn covering_set(&self) -> &HashMap { @@ -100,8 +101,6 @@ pub fn earliest_attestation_validators( state_attestations .iter() // In a single epoch, an attester should only be attesting for one slot and index. - // TODO: we avoid including slashable attestations in the state here, - // but maybe we should do something else with them (like construct slashings). .filter(|existing_attestation| { existing_attestation.data.slot == attestation.data.slot && existing_attestation.data.index == attestation.data.index diff --git a/beacon_node/operation_pool/src/attester_slashing.rs b/beacon_node/operation_pool/src/attester_slashing.rs index f9353d2711..ad4cd01ea2 100644 --- a/beacon_node/operation_pool/src/attester_slashing.rs +++ b/beacon_node/operation_pool/src/attester_slashing.rs @@ -3,6 +3,7 @@ use state_processing::per_block_processing::get_slashable_indices_modular; use std::collections::{HashMap, HashSet}; use types::{AttesterSlashing, BeaconState, ChainSpec, EthSpec}; +#[derive(Debug, Clone)] pub struct AttesterSlashingMaxCover<'a, T: EthSpec> { slashing: &'a AttesterSlashing, effective_balances: HashMap, @@ -46,8 +47,8 @@ impl<'a, T: EthSpec> MaxCover for AttesterSlashingMaxCover<'a, T> { type Set = HashMap; /// Extract an object for inclusion in a solution. - fn object(&self) -> AttesterSlashing { - self.slashing.clone() + fn object(&self) -> &AttesterSlashing { + self.slashing } /// Get the set of elements covered. diff --git a/beacon_node/operation_pool/src/lib.rs b/beacon_node/operation_pool/src/lib.rs index c2047eba3f..2e938d591e 100644 --- a/beacon_node/operation_pool/src/lib.rs +++ b/beacon_node/operation_pool/src/lib.rs @@ -2,6 +2,7 @@ mod attestation; mod attestation_id; mod attester_slashing; mod max_cover; +mod metrics; mod persistence; pub use persistence::PersistedOperationPool; @@ -9,7 +10,7 @@ pub use persistence::PersistedOperationPool; use attestation::AttMaxCover; use attestation_id::AttestationId; use attester_slashing::AttesterSlashingMaxCover; -use max_cover::maximum_cover; +use max_cover::{maximum_cover, MaxCover}; use parking_lot::RwLock; use state_processing::per_block_processing::errors::AttestationValidationError; use state_processing::per_block_processing::{ @@ -96,49 +97,29 @@ impl OperationPool { self.attestations.read().values().map(Vec::len).sum() } - /// Get a list of attestations for inclusion in a block. - /// - /// The `validity_filter` is a closure that provides extra filtering of the attestations - /// before an approximately optimal bundle is constructed. We use it to provide access - /// to the fork choice data from the `BeaconChain` struct that doesn't logically belong - /// in the operation pool. - pub fn get_attestations( - &self, - state: &BeaconState, - validity_filter: impl FnMut(&&Attestation) -> bool, - spec: &ChainSpec, - ) -> Result>, OpPoolError> { - // Attestations for the current fork, which may be from the current or previous epoch. - let prev_epoch = state.previous_epoch(); - let current_epoch = state.current_epoch(); - let prev_domain_bytes = AttestationId::compute_domain_bytes( - prev_epoch, + /// Return all valid attestations for the given epoch, for use in max cover. + fn get_valid_attestations_for_epoch<'a>( + &'a self, + epoch: Epoch, + all_attestations: &'a HashMap>>, + state: &'a BeaconState, + total_active_balance: u64, + validity_filter: impl FnMut(&&Attestation) -> bool + Send, + spec: &'a ChainSpec, + ) -> impl Iterator> + Send { + let domain_bytes = AttestationId::compute_domain_bytes( + epoch, &state.fork, state.genesis_validators_root, spec, ); - let curr_domain_bytes = AttestationId::compute_domain_bytes( - current_epoch, - &state.fork, - state.genesis_validators_root, - spec, - ); - let reader = self.attestations.read(); - let active_indices = state - .get_cached_active_validator_indices(RelativeEpoch::Current) - .map_err(OpPoolError::GetAttestationsTotalBalanceError)?; - let total_active_balance = state - .get_total_balance(&active_indices, spec) - .map_err(OpPoolError::GetAttestationsTotalBalanceError)?; - let valid_attestations = reader + all_attestations .iter() - .filter(|(key, _)| { - key.domain_bytes_match(&prev_domain_bytes) - || key.domain_bytes_match(&curr_domain_bytes) - }) + .filter(move |(key, _)| key.domain_bytes_match(&domain_bytes)) .flat_map(|(_, attestations)| attestations) - // That are valid... - .filter(|attestation| { + .filter(move |attestation| attestation.data.target.epoch == epoch) + .filter(move |attestation| { + // Ensure attestations are valid for block inclusion verify_attestation_for_block_inclusion( state, attestation, @@ -148,10 +129,77 @@ impl OperationPool { .is_ok() }) .filter(validity_filter) - .flat_map(|att| AttMaxCover::new(att, state, total_active_balance, spec)); + .filter_map(move |att| AttMaxCover::new(att, state, total_active_balance, spec)) + } - Ok(maximum_cover( - valid_attestations, + /// Get a list of attestations for inclusion in a block. + /// + /// The `validity_filter` is a closure that provides extra filtering of the attestations + /// before an approximately optimal bundle is constructed. We use it to provide access + /// to the fork choice data from the `BeaconChain` struct that doesn't logically belong + /// in the operation pool. + pub fn get_attestations( + &self, + state: &BeaconState, + prev_epoch_validity_filter: impl FnMut(&&Attestation) -> bool + Send, + curr_epoch_validity_filter: impl FnMut(&&Attestation) -> bool + Send, + spec: &ChainSpec, + ) -> Result>, OpPoolError> { + // Attestations for the current fork, which may be from the current or previous epoch. + let prev_epoch = state.previous_epoch(); + let current_epoch = state.current_epoch(); + let all_attestations = self.attestations.read(); + let active_indices = state + .get_cached_active_validator_indices(RelativeEpoch::Current) + .map_err(OpPoolError::GetAttestationsTotalBalanceError)?; + let total_active_balance = state + .get_total_balance(&active_indices, spec) + .map_err(OpPoolError::GetAttestationsTotalBalanceError)?; + + // Split attestations for the previous & current epochs, so that we + // can optimise them individually in parallel. + let prev_epoch_att = self.get_valid_attestations_for_epoch( + prev_epoch, + &*all_attestations, + state, + total_active_balance, + prev_epoch_validity_filter, + spec, + ); + let curr_epoch_att = self.get_valid_attestations_for_epoch( + current_epoch, + &*all_attestations, + state, + total_active_balance, + curr_epoch_validity_filter, + spec, + ); + + let prev_epoch_limit = std::cmp::min( + T::MaxPendingAttestations::to_usize() + .saturating_sub(state.previous_epoch_attestations.len()), + T::MaxAttestations::to_usize(), + ); + + let (prev_cover, curr_cover) = rayon::join( + move || { + let _timer = metrics::start_timer(&metrics::ATTESTATION_PREV_EPOCH_PACKING_TIME); + // If we're in the genesis epoch, just use the current epoch attestations. + if prev_epoch == current_epoch { + vec![] + } else { + maximum_cover(prev_epoch_att, prev_epoch_limit) + } + }, + move || { + let _timer = metrics::start_timer(&metrics::ATTESTATION_CURR_EPOCH_PACKING_TIME); + maximum_cover(curr_epoch_att, T::MaxAttestations::to_usize()) + }, + ); + + Ok(max_cover::merge_solutions( + curr_cover, + prev_cover, T::MaxAttestations::to_usize(), )) } @@ -231,7 +279,10 @@ impl OperationPool { let attester_slashings = maximum_cover( relevant_attester_slashings, T::MaxAttesterSlashings::to_usize(), - ); + ) + .into_iter() + .map(|cover| cover.object().clone()) + .collect(); (proposer_slashings, attester_slashings) } @@ -619,7 +670,7 @@ mod release_tests { state.slot -= 1; assert_eq!( op_pool - .get_attestations(state, |_| true, spec) + .get_attestations(state, |_| true, |_| true, spec) .expect("should have attestations") .len(), 0 @@ -629,7 +680,7 @@ mod release_tests { state.slot += spec.min_attestation_inclusion_delay; let block_attestations = op_pool - .get_attestations(state, |_| true, spec) + .get_attestations(state, |_| true, |_| true, spec) .expect("Should have block attestations"); assert_eq!(block_attestations.len(), committees.len()); @@ -799,7 +850,7 @@ mod release_tests { state.slot += spec.min_attestation_inclusion_delay; let best_attestations = op_pool - .get_attestations(state, |_| true, spec) + .get_attestations(state, |_| true, |_| true, spec) .expect("should have best attestations"); assert_eq!(best_attestations.len(), max_attestations); @@ -874,7 +925,7 @@ mod release_tests { state.slot += spec.min_attestation_inclusion_delay; let best_attestations = op_pool - .get_attestations(state, |_| true, spec) + .get_attestations(state, |_| true, |_| true, spec) .expect("should have valid best attestations"); assert_eq!(best_attestations.len(), max_attestations); diff --git a/beacon_node/operation_pool/src/max_cover.rs b/beacon_node/operation_pool/src/max_cover.rs index 8188d6939b..be0d4f746f 100644 --- a/beacon_node/operation_pool/src/max_cover.rs +++ b/beacon_node/operation_pool/src/max_cover.rs @@ -1,3 +1,5 @@ +use itertools::Itertools; + /// Trait for types that we can compute a maximum cover for. /// /// Terminology: @@ -5,14 +7,14 @@ /// * `element`: something contained in a set, and covered by the covering set of an item /// * `object`: something extracted from an item in order to comprise a solution /// See: https://en.wikipedia.org/wiki/Maximum_coverage_problem -pub trait MaxCover { +pub trait MaxCover: Clone { /// The result type, of which we would eventually like a collection of maximal quality. - type Object; + type Object: Clone; /// The type used to represent sets. type Set: Clone; /// Extract an object for inclusion in a solution. - fn object(&self) -> Self::Object; + fn object(&self) -> &Self::Object; /// Get the set of elements covered. fn covering_set(&self) -> &Self::Set; @@ -42,7 +44,7 @@ impl MaxCoverItem { /// /// * Time complexity: `O(limit * items_iter.len())` /// * Space complexity: `O(item_iter.len())` -pub fn maximum_cover(items_iter: I, limit: usize) -> Vec +pub fn maximum_cover(items_iter: I, limit: usize) -> Vec where I: IntoIterator, T: MaxCover, @@ -58,14 +60,14 @@ where for _ in 0..limit { // Select the item with the maximum score. - let (best_item, best_cover) = match all_items + let best = match all_items .iter_mut() .filter(|x| x.available && x.item.score() != 0) .max_by_key(|x| x.item.score()) { Some(x) => { x.available = false; - (x.item.object(), x.item.covering_set().clone()) + x.item.clone() } None => return result, }; @@ -75,14 +77,32 @@ where all_items .iter_mut() .filter(|x| x.available && x.item.score() != 0) - .for_each(|x| x.item.update_covering_set(&best_item, &best_cover)); + .for_each(|x| { + x.item + .update_covering_set(best.object(), best.covering_set()) + }); - result.push(best_item); + result.push(best); } result } +/// Perform a greedy merge of two max cover solutions, preferring higher-score values. +pub fn merge_solutions(cover1: I1, cover2: I2, limit: usize) -> Vec +where + I1: IntoIterator, + I2: IntoIterator, + T: MaxCover, +{ + cover1 + .into_iter() + .merge_by(cover2, |item1, item2| item1.score() >= item2.score()) + .take(limit) + .map(|item| item.object().clone()) + .collect() +} + #[cfg(test)] mod test { use super::*; @@ -96,8 +116,8 @@ mod test { type Object = Self; type Set = Self; - fn object(&self) -> Self { - self.clone() + fn object(&self) -> &Self { + self } fn covering_set(&self) -> &Self { diff --git a/beacon_node/operation_pool/src/metrics.rs b/beacon_node/operation_pool/src/metrics.rs new file mode 100644 index 0000000000..e69dfaa16f --- /dev/null +++ b/beacon_node/operation_pool/src/metrics.rs @@ -0,0 +1,14 @@ +use lazy_static::lazy_static; + +pub use lighthouse_metrics::*; + +lazy_static! { + pub static ref ATTESTATION_PREV_EPOCH_PACKING_TIME: Result = try_create_histogram( + "op_pool_attestation_prev_epoch_packing_time", + "Time to pack previous epoch attestations" + ); + pub static ref ATTESTATION_CURR_EPOCH_PACKING_TIME: Result = try_create_histogram( + "op_pool_attestation_curr_epoch_packing_time", + "Time to pack current epoch attestations" + ); +}