From 38d2d03e3a98bc88ddf747a0eab3b72a2163623f Mon Sep 17 00:00:00 2001 From: Michael Sproul Date: Mon, 17 Jun 2019 18:07:14 +1000 Subject: [PATCH] op_pool: use max cover algorithm, refactor --- eth2/operation_pool/Cargo.toml | 1 + eth2/operation_pool/src/attestation.rs | 91 +++++++++++ eth2/operation_pool/src/attestation_id.rs | 35 ++++ eth2/operation_pool/src/lib.rs | 139 +++------------- eth2/operation_pool/src/max_cover.rs | 189 ++++++++++++++++++++++ eth2/utils/boolean-bitfield/src/lib.rs | 3 +- 6 files changed, 338 insertions(+), 120 deletions(-) create mode 100644 eth2/operation_pool/src/attestation.rs create mode 100644 eth2/operation_pool/src/attestation_id.rs create mode 100644 eth2/operation_pool/src/max_cover.rs diff --git a/eth2/operation_pool/Cargo.toml b/eth2/operation_pool/Cargo.toml index 67d13013ca..99c7bfc06f 100644 --- a/eth2/operation_pool/Cargo.toml +++ b/eth2/operation_pool/Cargo.toml @@ -5,6 +5,7 @@ authors = ["Michael Sproul "] edition = "2018" [dependencies] +boolean-bitfield = { path = "../utils/boolean-bitfield" } int_to_bytes = { path = "../utils/int_to_bytes" } itertools = "0.8" parking_lot = "0.7" diff --git a/eth2/operation_pool/src/attestation.rs b/eth2/operation_pool/src/attestation.rs new file mode 100644 index 0000000000..a2f71c3a48 --- /dev/null +++ b/eth2/operation_pool/src/attestation.rs @@ -0,0 +1,91 @@ +use crate::max_cover::MaxCover; +use boolean_bitfield::BooleanBitfield; +use types::{Attestation, BeaconState, EthSpec}; + +pub struct AttMaxCover<'a> { + /// Underlying attestation. + att: &'a Attestation, + /// Bitfield of validators that are covered by this attestation. + fresh_validators: BooleanBitfield, +} + +impl<'a> AttMaxCover<'a> { + pub fn new(att: &'a Attestation, fresh_validators: BooleanBitfield) -> Self { + Self { + att, + fresh_validators, + } + } +} + +impl<'a> MaxCover for AttMaxCover<'a> { + type Object = Attestation; + type Set = BooleanBitfield; + + fn object(&self) -> Attestation { + self.att.clone() + } + + fn covering_set(&self) -> &BooleanBitfield { + &self.fresh_validators + } + + /// Sneaky: we keep all the attestations together in one bucket, even though + /// their aggregation bitfields refer to different committees. In order to avoid + /// confusing committees when updating covering sets, we update only those attestations + /// whose shard and epoch match the attestation being included in the solution, by the logic + /// that a shard and epoch uniquely identify a committee. + fn update_covering_set( + &mut self, + best_att: &Attestation, + covered_validators: &BooleanBitfield, + ) { + if self.att.data.shard == best_att.data.shard + && self.att.data.target_epoch == best_att.data.target_epoch + { + self.fresh_validators.difference_inplace(covered_validators); + } + } + + fn score(&self) -> usize { + self.fresh_validators.num_set_bits() + } +} + +/// Extract the validators for which `attestation` would be their earliest in the epoch. +/// +/// The reward paid to a proposer for including an attestation is proportional to the number +/// of validators for which the included attestation is their first in the epoch. The attestation +/// is judged against the state's `current_epoch_attestations` or `previous_epoch_attestations` +/// depending on when it was created, and all those validators who have already attested are +/// removed from the `aggregation_bitfield` before returning it. +// TODO: This could be optimised with a map from validator index to whether that validator has +// attested in each of the current and previous epochs. Currently quadratic in number of validators. +pub fn earliest_attestation_validators( + attestation: &Attestation, + state: &BeaconState, +) -> BooleanBitfield { + // Bitfield of validators whose attestations are new/fresh. + let mut new_validators = attestation.aggregation_bitfield.clone(); + + let state_attestations = if attestation.data.target_epoch == state.current_epoch() { + &state.current_epoch_attestations + } else if attestation.data.target_epoch == state.previous_epoch() { + &state.previous_epoch_attestations + } else { + return BooleanBitfield::from_elem(attestation.aggregation_bitfield.len(), false); + }; + + state_attestations + .iter() + // In a single epoch, an attester should only be attesting for one shard. + // TODO: we avoid including slashable attestations in the state here, + // but maybe we should do something else with them (like construct slashings). + .filter(|existing_attestation| existing_attestation.data.shard == attestation.data.shard) + .for_each(|existing_attestation| { + // Remove the validators who have signed the existing attestation (they are not new) + new_validators.difference_inplace(&existing_attestation.aggregation_bitfield); + }); + + new_validators +} diff --git a/eth2/operation_pool/src/attestation_id.rs b/eth2/operation_pool/src/attestation_id.rs new file mode 100644 index 0000000000..ba25174e65 --- /dev/null +++ b/eth2/operation_pool/src/attestation_id.rs @@ -0,0 +1,35 @@ +use int_to_bytes::int_to_bytes8; +use ssz::ssz_encode; +use types::{AttestationData, BeaconState, ChainSpec, Domain, Epoch, EthSpec}; + +/// Serialized `AttestationData` augmented with a domain to encode the fork info. +#[derive(PartialEq, Eq, Clone, Hash, Debug)] +pub struct AttestationId(Vec); + +/// Number of domain bytes that the end of an attestation ID is padded with. +const DOMAIN_BYTES_LEN: usize = 8; + +impl AttestationId { + pub fn from_data( + attestation: &AttestationData, + state: &BeaconState, + spec: &ChainSpec, + ) -> Self { + let mut bytes = ssz_encode(attestation); + let epoch = attestation.target_epoch; + bytes.extend_from_slice(&AttestationId::compute_domain_bytes(epoch, state, spec)); + AttestationId(bytes) + } + + pub fn compute_domain_bytes( + epoch: Epoch, + state: &BeaconState, + spec: &ChainSpec, + ) -> Vec { + int_to_bytes8(spec.get_domain(epoch, Domain::Attestation, &state.fork)) + } + + pub fn domain_bytes_match(&self, domain_bytes: &[u8]) -> bool { + &self.0[self.0.len() - DOMAIN_BYTES_LEN..] == domain_bytes + } +} diff --git a/eth2/operation_pool/src/lib.rs b/eth2/operation_pool/src/lib.rs index ec7d5aa905..7157ec9eec 100644 --- a/eth2/operation_pool/src/lib.rs +++ b/eth2/operation_pool/src/lib.rs @@ -1,7 +1,12 @@ -use int_to_bytes::int_to_bytes8; +mod attestation; +mod attestation_id; +mod max_cover; + +use attestation::{earliest_attestation_validators, AttMaxCover}; +use attestation_id::AttestationId; use itertools::Itertools; +use max_cover::maximum_cover; use parking_lot::RwLock; -use ssz::ssz_encode; use state_processing::per_block_processing::errors::{ AttestationValidationError, AttesterSlashingValidationError, DepositValidationError, ExitValidationError, ProposerSlashingValidationError, TransferValidationError, @@ -16,10 +21,9 @@ use state_processing::per_block_processing::{ }; use std::collections::{btree_map::Entry, hash_map, BTreeMap, HashMap, HashSet}; use std::marker::PhantomData; -use types::chain_spec::Domain; use types::{ - Attestation, AttestationData, AttesterSlashing, BeaconState, ChainSpec, Deposit, Epoch, - EthSpec, ProposerSlashing, Transfer, Validator, VoluntaryExit, + Attestation, AttesterSlashing, BeaconState, ChainSpec, Deposit, EthSpec, ProposerSlashing, + Transfer, Validator, VoluntaryExit, }; #[derive(Default)] @@ -43,71 +47,6 @@ pub struct OperationPool { _phantom: PhantomData, } -/// Serialized `AttestationData` augmented with a domain to encode the fork info. -#[derive(PartialEq, Eq, Clone, Hash, Debug)] -struct AttestationId(Vec); - -/// Number of domain bytes that the end of an attestation ID is padded with. -const DOMAIN_BYTES_LEN: usize = 8; - -impl AttestationId { - fn from_data( - attestation: &AttestationData, - state: &BeaconState, - spec: &ChainSpec, - ) -> Self { - let mut bytes = ssz_encode(attestation); - let epoch = attestation.target_epoch; - bytes.extend_from_slice(&AttestationId::compute_domain_bytes(epoch, state, spec)); - AttestationId(bytes) - } - - fn compute_domain_bytes( - epoch: Epoch, - state: &BeaconState, - spec: &ChainSpec, - ) -> Vec { - int_to_bytes8(spec.get_domain(epoch, Domain::Attestation, &state.fork)) - } - - fn domain_bytes_match(&self, domain_bytes: &[u8]) -> bool { - &self.0[self.0.len() - DOMAIN_BYTES_LEN..] == domain_bytes - } -} - -/// Compute a fitness score for an attestation. -/// -/// The score is calculated by determining the number of *new* attestations that -/// the aggregate attestation introduces, and is proportional to the size of the reward we will -/// receive for including it in a block. -// TODO: this could be optimised with a map from validator index to whether that validator has -// attested in each of the current and previous epochs. Currently quadractic in number of validators. -fn attestation_score(attestation: &Attestation, state: &BeaconState) -> usize { - // Bitfield of validators whose attestations are new/fresh. - let mut new_validators = attestation.aggregation_bitfield.clone(); - - let state_attestations = if attestation.data.target_epoch == state.current_epoch() { - &state.current_epoch_attestations - } else if attestation.data.target_epoch == state.previous_epoch() { - &state.previous_epoch_attestations - } else { - return 0; - }; - - state_attestations - .iter() - // In a single epoch, an attester should only be attesting for one shard. - // TODO: we avoid including slashable attestations in the state here, - // but maybe we should do something else with them (like construct slashings). - .filter(|current_attestation| current_attestation.data.shard == attestation.data.shard) - .for_each(|current_attestation| { - // Remove the validators who have signed the existing attestation (they are not new) - new_validators.difference_inplace(¤t_attestation.aggregation_bitfield); - }); - - new_validators.num_set_bits() -} - #[derive(Debug, PartialEq, Clone)] pub enum DepositInsertStatus { /// The deposit was not already in the pool. @@ -176,29 +115,19 @@ impl OperationPool { let current_epoch = state.current_epoch(); let prev_domain_bytes = AttestationId::compute_domain_bytes(prev_epoch, state, spec); let curr_domain_bytes = AttestationId::compute_domain_bytes(current_epoch, state, spec); - self.attestations - .read() + let reader = self.attestations.read(); + let valid_attestations = reader .iter() .filter(|(key, _)| { key.domain_bytes_match(&prev_domain_bytes) || key.domain_bytes_match(&curr_domain_bytes) }) .flat_map(|(_, attestations)| attestations) - // That are not superseded by an attestation included in the state... - .filter(|attestation| !superior_attestation_exists_in_state(state, attestation)) // That are valid... .filter(|attestation| validate_attestation(state, attestation, spec).is_ok()) - // Scored by the number of new attestations they introduce (descending) - // TODO: need to consider attestations introduced in THIS block - .map(|att| (att, attestation_score(att, state))) - // Don't include any useless attestations (score 0) - .filter(|&(_, score)| score != 0) - .sorted_by_key(|&(_, score)| std::cmp::Reverse(score)) - // Limited to the maximum number of attestations per block - .take(spec.max_attestations as usize) - .map(|(att, _)| att) - .cloned() - .collect() + .map(|att| AttMaxCover::new(att, earliest_attestation_validators(att, state))); + + maximum_cover(valid_attestations, spec.max_attestations as usize) } /// Remove attestations which are too old to be included in a block. @@ -484,34 +413,6 @@ impl OperationPool { } } -/// Returns `true` if the state already contains a `PendingAttestation` that is superior to the -/// given `attestation`. -/// -/// A validator has nothing to gain from re-including an attestation and it adds load to the -/// network. -/// -/// An existing `PendingAttestation` is superior to an existing `attestation` if: -/// -/// - Their `AttestationData` is equal. -/// - `attestation` does not contain any signatures that `PendingAttestation` does not have. -fn superior_attestation_exists_in_state( - state: &BeaconState, - attestation: &Attestation, -) -> bool { - state - .current_epoch_attestations - .iter() - .chain(state.previous_epoch_attestations.iter()) - .any(|existing_attestation| { - let bitfield = &attestation.aggregation_bitfield; - let existing_bitfield = &existing_attestation.aggregation_bitfield; - - existing_attestation.data == attestation.data - && bitfield.intersection(existing_bitfield).num_set_bits() - == bitfield.num_set_bits() - }) -} - /// Filter up to a maximum number of operations out of an iterator. fn filter_limit_operations<'a, T: 'a, I, F>(operations: I, filter: F, limit: u64) -> Vec where @@ -734,15 +635,13 @@ mod tests { state_builder.teleport_to_slot(slot); state_builder.build_caches(&spec).unwrap(); let (state, keypairs) = state_builder.build(); - (state, keypairs, MainnetEthSpec::default_spec()) } #[test] - fn test_attestation_score() { + fn test_earliest_attestation() { let (ref mut state, ref keypairs, ref spec) = attestation_test_state::(1); - let slot = state.slot - 1; let committees = state .get_crosslink_committees_at_slot(slot) @@ -775,9 +674,8 @@ mod tests { assert_eq!( att1.aggregation_bitfield.num_set_bits(), - attestation_score(&att1, state) + earliest_attestation_validators(&att1, state).num_set_bits() ); - state.current_epoch_attestations.push(PendingAttestation { aggregation_bitfield: att1.aggregation_bitfield.clone(), data: att1.data.clone(), @@ -785,7 +683,10 @@ mod tests { proposer_index: 0, }); - assert_eq!(cc.committee.len() - 2, attestation_score(&att2, state)); + assert_eq!( + cc.committee.len() - 2, + earliest_attestation_validators(&att2, state).num_set_bits() + ); } } diff --git a/eth2/operation_pool/src/max_cover.rs b/eth2/operation_pool/src/max_cover.rs new file mode 100644 index 0000000000..75ac140546 --- /dev/null +++ b/eth2/operation_pool/src/max_cover.rs @@ -0,0 +1,189 @@ +/// Trait for types that we can compute a maximum cover for. +/// +/// Terminology: +/// * `item`: something that implements this trait +/// * `element`: something contained in a set, and covered by the covering set of an item +/// * `object`: something extracted from an item in order to comprise a solution +/// See: https://en.wikipedia.org/wiki/Maximum_coverage_problem +pub trait MaxCover { + /// The result type, of which we would eventually like a collection of maximal quality. + type Object; + /// The type used to represent sets. + type Set: Clone; + + /// Extract an object for inclusion in a solution. + fn object(&self) -> Self::Object; + + /// Get the set of elements covered. + fn covering_set(&self) -> &Self::Set; + /// Update the set of items covered, for the inclusion of some object in the solution. + fn update_covering_set(&mut self, max_obj: &Self::Object, max_set: &Self::Set); + /// The quality of this item's covering set, usually its cardinality. + fn score(&self) -> usize; +} + +/// Helper struct to track which items of the input are still available for inclusion. +/// Saves removing elements from the work vector. +struct MaxCoverItem { + item: T, + available: bool, +} + +impl MaxCoverItem { + fn new(item: T) -> Self { + MaxCoverItem { + item, + available: true, + } + } +} + +/// Compute an approximate maximum cover using a greedy algorithm. +/// +/// * Time complexity: `O(limit * items_iter.len())` +/// * Space complexity: `O(item_iter.len())` +pub fn maximum_cover<'a, I, T>(items_iter: I, limit: usize) -> Vec +where + I: IntoIterator, + T: MaxCover, +{ + // Construct an initial vec of all items, marked available. + let mut all_items: Vec<_> = items_iter + .into_iter() + .map(MaxCoverItem::new) + .filter(|x| x.item.score() != 0) + .collect(); + + let mut result = vec![]; + + for _ in 0..limit { + // Select the item with the maximum score. + let (best_item, best_cover) = match all_items + .iter_mut() + .filter(|x| x.available && x.item.score() != 0) + .max_by_key(|x| x.item.score()) + { + Some(x) => { + x.available = false; + (x.item.object(), x.item.covering_set().clone()) + } + None => return result, + }; + + // Update the covering sets of the other items, for the inclusion of the selected item. + // Items covered by the selected item can't be re-covered. + all_items + .iter_mut() + .filter(|x| x.available && x.item.score() != 0) + .for_each(|x| x.item.update_covering_set(&best_item, &best_cover)); + + result.push(best_item); + } + + result +} + +#[cfg(test)] +mod test { + use super::*; + use std::iter::FromIterator; + use std::{collections::HashSet, hash::Hash}; + + impl MaxCover for HashSet + where + T: Clone + Eq + Hash, + { + type Object = Self; + type Set = Self; + + fn object(&self) -> Self { + self.clone() + } + + fn covering_set(&self) -> &Self { + &self + } + + fn update_covering_set(&mut self, _: &Self, other: &Self) { + let mut difference = &*self - other; + std::mem::swap(self, &mut difference); + } + + fn score(&self) -> usize { + self.len() + } + } + + fn example_system() -> Vec> { + vec![ + HashSet::from_iter(vec![3]), + HashSet::from_iter(vec![1, 2, 4, 5]), + HashSet::from_iter(vec![1, 2, 4, 5]), + HashSet::from_iter(vec![1]), + HashSet::from_iter(vec![2, 4, 5]), + ] + } + + #[test] + fn zero_limit() { + let cover = maximum_cover(example_system(), 0); + assert_eq!(cover.len(), 0); + } + + #[test] + fn one_limit() { + let sets = example_system(); + let cover = maximum_cover(sets.clone(), 1); + assert_eq!(cover.len(), 1); + assert_eq!(cover[0], sets[1]); + } + + // Check that even if the limit provides room, we don't include useless items in the soln. + #[test] + fn exclude_zero_score() { + let sets = example_system(); + for k in 2..10 { + let cover = maximum_cover(sets.clone(), k); + assert_eq!(cover.len(), 2); + assert_eq!(cover[0], sets[1]); + assert_eq!(cover[1], sets[0]); + } + } + + fn quality(solution: &[HashSet]) -> usize { + solution.iter().map(HashSet::len).sum() + } + + // Optimal solution is the first three sets (quality 15) but our greedy algorithm + // will select the last three (quality 11). The comment at the end of each line + // shows that set's score at each iteration, with a * indicating that it will be chosen. + #[test] + fn suboptimal() { + let sets = vec![ + HashSet::from_iter(vec![0, 1, 8, 11, 14]), // 5, 3, 2 + HashSet::from_iter(vec![2, 3, 7, 9, 10]), // 5, 3, 2 + HashSet::from_iter(vec![4, 5, 6, 12, 13]), // 5, 4, 2 + HashSet::from_iter(vec![9, 10]), // 4, 4, 2* + HashSet::from_iter(vec![5, 6, 7, 8]), // 4, 4* + HashSet::from_iter(vec![0, 1, 2, 3, 4]), // 5* + ]; + let cover = maximum_cover(sets.clone(), 3); + assert_eq!(quality(&cover), 11); + } + + #[test] + fn intersecting_ok() { + let sets = vec![ + HashSet::from_iter(vec![1, 2, 3, 4, 5, 6, 7, 8]), + HashSet::from_iter(vec![1, 2, 3, 9, 10, 11]), + HashSet::from_iter(vec![4, 5, 6, 12, 13, 14]), + HashSet::from_iter(vec![7, 8, 15, 16, 17, 18]), + HashSet::from_iter(vec![1, 2, 9, 10]), + HashSet::from_iter(vec![1, 5, 6, 8]), + HashSet::from_iter(vec![1, 7, 11, 19]), + ]; + let cover = maximum_cover(sets.clone(), 5); + assert_eq!(quality(&cover), 19); + assert_eq!(cover.len(), 5); + } +} diff --git a/eth2/utils/boolean-bitfield/src/lib.rs b/eth2/utils/boolean-bitfield/src/lib.rs index 08e56e7c3f..ac6ffa89a5 100644 --- a/eth2/utils/boolean-bitfield/src/lib.rs +++ b/eth2/utils/boolean-bitfield/src/lib.rs @@ -13,7 +13,7 @@ use std::default; /// A BooleanBitfield represents a set of booleans compactly stored as a vector of bits. /// The BooleanBitfield is given a fixed size during construction. Reads outside of the current size return an out-of-bounds error. Writes outside of the current size expand the size of the set. -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Hash)] pub struct BooleanBitfield(BitVec); /// Error represents some reason a request against a bitfield was not satisfied @@ -170,6 +170,7 @@ impl cmp::PartialEq for BooleanBitfield { ssz::ssz_encode(self) == ssz::ssz_encode(other) } } +impl Eq for BooleanBitfield {} /// Create a new bitfield that is a union of two other bitfields. ///