Fix naive import, greedy aggregation

This commit is contained in:
Michael Sproul
2022-07-07 10:59:19 +10:00
parent d948e9764f
commit 24fdd56baf
8 changed files with 84 additions and 36 deletions

View File

@@ -69,9 +69,12 @@ use slog::{crit, debug, error, info, trace, warn, Logger};
use slot_clock::SlotClock; use slot_clock::SlotClock;
use ssz::Encode; use ssz::Encode;
use state_processing::{ use state_processing::{
common::get_indexed_attestation, common::{get_attesting_indices, get_indexed_attestation},
per_block_processing, per_block_processing,
per_block_processing::errors::AttestationValidationError, per_block_processing::{
errors::AttestationValidationError, verify_attestation_for_block_inclusion,
VerifySignatures,
},
per_slot_processing, per_slot_processing,
state_advance::{complete_state_advance, partial_state_advance}, state_advance::{complete_state_advance, partial_state_advance},
BlockSignatureStrategy, SigVerifiedOp, VerifyBlockRoot, BlockSignatureStrategy, SigVerifiedOp, VerifyBlockRoot,
@@ -3276,8 +3279,17 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
let unagg_import_timer = let unagg_import_timer =
metrics::start_timer(&metrics::BLOCK_PRODUCTION_UNAGGREGATED_TIMES); metrics::start_timer(&metrics::BLOCK_PRODUCTION_UNAGGREGATED_TIMES);
for attestation in self.naive_aggregation_pool.read().iter() { for attestation in self.naive_aggregation_pool.read().iter() {
// FIXME(sproul): put correct attesting indices let import = |attestation: &Attestation<T::EthSpec>| {
if let Err(e) = self.op_pool.insert_attestation(attestation.clone(), vec![]) { let committee =
state.get_beacon_committee(attestation.data.slot, attestation.data.index)?;
let attesting_indices = get_attesting_indices::<T::EthSpec>(
committee.committee,
&attestation.aggregation_bits,
)?;
self.op_pool
.insert_attestation(attestation.clone(), attesting_indices)
};
if let Err(e) = import(attestation) {
// Don't stop block production if there's an error, just create a log. // Don't stop block production if there's an error, just create a log.
error!( error!(
self.log, self.log,
@@ -3306,7 +3318,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
self.filter_op_pool_attestation(&mut curr_filter_cache, att, &state) self.filter_op_pool_attestation(&mut curr_filter_cache, att, &state)
}; };
let attestations = self let mut attestations = self
.op_pool .op_pool
.get_attestations( .get_attestations(
&state, &state,
@@ -3315,6 +3327,33 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
&self.spec, &self.spec,
) )
.map_err(BlockProductionError::OpPoolError)?; .map_err(BlockProductionError::OpPoolError)?;
let paranoid = false;
let ultra_paranoid = true;
if paranoid {
let verify_sigs = if ultra_paranoid {
VerifySignatures::True
} else {
VerifySignatures::False
};
attestations.retain(|att| {
let res =
verify_attestation_for_block_inclusion(&state, att, verify_sigs, &self.spec);
if let Err(e) = res {
error!(
self.log,
"Attempted to include an invalid attestation";
"err" => ?e,
"block_slot" => state.slot(),
);
assert!(false);
false
} else {
true
}
});
}
drop(attestation_packing_timer); drop(attestation_packing_timer);
let slot = state.slot(); let slot = state.slot();

View File

@@ -1,3 +1,4 @@
use itertools::Itertools;
use std::collections::HashMap; use std::collections::HashMap;
use types::{ use types::{
AggregateSignature, Attestation, AttestationData, BeaconState, BitList, Checkpoint, Epoch, AggregateSignature, Attestation, AttestationData, BeaconState, BitList, Checkpoint, Epoch,
@@ -18,7 +19,7 @@ pub struct CompactAttestationData {
pub target_root: Hash256, pub target_root: Hash256,
} }
#[derive(Debug)] #[derive(Debug, PartialEq)]
pub struct CompactIndexedAttestation<T: EthSpec> { pub struct CompactIndexedAttestation<T: EthSpec> {
pub attesting_indices: Vec<u64>, pub attesting_indices: Vec<u64>,
pub aggregation_bits: BitList<T::MaxValidatorsPerCommittee>, pub aggregation_bits: BitList<T::MaxValidatorsPerCommittee>,
@@ -107,6 +108,24 @@ impl CheckpointKey {
} }
} }
impl<T: EthSpec> CompactIndexedAttestation<T> {
pub fn signers_disjoint_from(&self, other: &Self) -> bool {
self.aggregation_bits
.intersection(&other.aggregation_bits)
.is_zero()
}
pub fn aggregate(&mut self, other: &Self) {
self.attesting_indices = self
.attesting_indices
.drain(..)
.merge(other.attesting_indices.iter().copied())
.collect();
self.aggregation_bits = self.aggregation_bits.union(&other.aggregation_bits);
self.signature.add_assign_aggregate(&other.signature);
}
}
impl<T: EthSpec> AttestationMap<T> { impl<T: EthSpec> AttestationMap<T> {
pub fn insert(&mut self, attestation: Attestation<T>, attesting_indices: Vec<u64>) { pub fn insert(&mut self, attestation: Attestation<T>, attesting_indices: Vec<u64>) {
let (checkpoint_key, attestation_data, indexed_attestation) = let (checkpoint_key, attestation_data, indexed_attestation) =
@@ -121,31 +140,22 @@ impl<T: EthSpec> AttestationMap<T> {
.entry(attestation_data) .entry(attestation_data)
.or_insert_with(Vec::new); .or_insert_with(Vec::new);
// FIXME(sproul): do greedy aggregation here // Greedily aggregate the attestation with all existing attestations.
/* // NOTE: this is sub-optimal and in future we will remove this in favour of max-clique
let existing_attestations = match attestations.entry(id) { // aggregation.
Entry::Vacant(entry) => {
entry.insert(vec![attestation]);
return Ok(());
}
Entry::Occupied(entry) => entry.into_mut(),
};
let mut aggregated = false; let mut aggregated = false;
for existing_attestation in existing_attestations.iter_mut() { for existing_attestation in attestations.iter_mut() {
if existing_attestation.signers_disjoint_from(&attestation) { if existing_attestation.signers_disjoint_from(&indexed_attestation) {
existing_attestation.aggregate(&attestation); existing_attestation.aggregate(&indexed_attestation);
aggregated = true; aggregated = true;
} else if *existing_attestation == attestation { } else if *existing_attestation == indexed_attestation {
aggregated = true; aggregated = true;
} }
} }
if !aggregated { if !aggregated {
existing_attestations.push(attestation); attestations.push(indexed_attestation);
} }
*/
attestations.push(indexed_attestation);
} }
pub fn get_attestations<'a>( pub fn get_attestations<'a>(
@@ -154,7 +164,7 @@ impl<T: EthSpec> AttestationMap<T> {
) -> impl Iterator<Item = AttestationRef<'a, T>> + 'a { ) -> impl Iterator<Item = AttestationRef<'a, T>> + 'a {
// It's a monad :O // It's a monad :O
self.checkpoint_map self.checkpoint_map
.get(&checkpoint_key) .get(checkpoint_key)
.into_iter() .into_iter()
.flat_map(|attestation_map| { .flat_map(|attestation_map| {
attestation_map attestation_map

View File

@@ -16,7 +16,6 @@ pub use reward_cache::RewardCache;
use crate::attestation_storage::{AttestationMap, CheckpointKey}; use crate::attestation_storage::{AttestationMap, CheckpointKey};
use crate::sync_aggregate_id::SyncAggregateId; use crate::sync_aggregate_id::SyncAggregateId;
use attestation_id::AttestationId;
use attester_slashing::AttesterSlashingMaxCover; use attester_slashing::AttesterSlashingMaxCover;
use max_cover::maximum_cover; use max_cover::maximum_cover;
use parking_lot::{RwLock, RwLockWriteGuard}; use parking_lot::{RwLock, RwLockWriteGuard};
@@ -234,9 +233,12 @@ impl<T: EthSpec> OperationPool<T> {
validity_filter: impl FnMut(&AttestationRef<'a, T>) -> bool + Send, validity_filter: impl FnMut(&AttestationRef<'a, T>) -> bool + Send,
spec: &'a ChainSpec, spec: &'a ChainSpec,
) -> impl Iterator<Item = AttMaxCover<'a, T>> + Send { ) -> impl Iterator<Item = AttMaxCover<'a, T>> + Send {
// FIXME(sproul): check inclusion slot somewhere
all_attestations all_attestations
.get_attestations(checkpoint_key) .get_attestations(checkpoint_key)
.filter(|att| {
att.data.slot + spec.min_attestation_inclusion_delay <= state.slot()
&& state.slot() <= att.data.slot + T::slots_per_epoch()
})
.filter(validity_filter) .filter(validity_filter)
.filter_map(move |att| { .filter_map(move |att| {
AttMaxCover::new(att, state, reward_cache, total_active_balance, spec) AttMaxCover::new(att, state, reward_cache, total_active_balance, spec)

View File

@@ -1,6 +1,5 @@
use crate::OpPoolError; use crate::OpPoolError;
use bitvec::vec::BitVec; use bitvec::vec::BitVec;
use std::collections::HashMap;
use types::{BeaconState, BeaconStateError, Epoch, EthSpec, Hash256, ParticipationFlags}; use types::{BeaconState, BeaconStateError, Epoch, EthSpec, Hash256, ParticipationFlags};
#[derive(Debug, Clone)] #[derive(Debug, Clone)]

View File

@@ -6,7 +6,7 @@ use types::*;
pub fn get_attesting_indices<T: EthSpec>( pub fn get_attesting_indices<T: EthSpec>(
committee: &[usize], committee: &[usize],
bitlist: &BitList<T::MaxValidatorsPerCommittee>, bitlist: &BitList<T::MaxValidatorsPerCommittee>,
) -> Result<Vec<usize>, BeaconStateError> { ) -> Result<Vec<u64>, BeaconStateError> {
if bitlist.len() != committee.len() { if bitlist.len() != committee.len() {
return Err(BeaconStateError::InvalidBitfield); return Err(BeaconStateError::InvalidBitfield);
} }
@@ -15,7 +15,7 @@ pub fn get_attesting_indices<T: EthSpec>(
for (i, validator_index) in committee.iter().enumerate() { for (i, validator_index) in committee.iter().enumerate() {
if let Ok(true) = bitlist.get(i) { if let Ok(true) = bitlist.get(i) {
indices.push(*validator_index) indices.push(*validator_index as u64)
} }
} }

View File

@@ -14,9 +14,7 @@ pub fn get_indexed_attestation<T: EthSpec>(
let attesting_indices = get_attesting_indices::<T>(committee, &attestation.aggregation_bits)?; let attesting_indices = get_attesting_indices::<T>(committee, &attestation.aggregation_bits)?;
Ok(IndexedAttestation { Ok(IndexedAttestation {
attesting_indices: VariableList::new( attesting_indices: VariableList::new(attesting_indices)?,
attesting_indices.into_iter().map(|x| x as u64).collect(),
)?,
data: attestation.data.clone(), data: attestation.data.clone(),
signature: attestation.signature.clone(), signature: attestation.signature.clone(),
}) })

View File

@@ -278,8 +278,8 @@ impl ValidatorStatuses {
// Loop through the participating validator indices and update the status vec. // Loop through the participating validator indices and update the status vec.
for validator_index in attesting_indices { for validator_index in attesting_indices {
self.statuses self.statuses
.get_mut(validator_index) .get_mut(validator_index as usize)
.ok_or(BeaconStateError::UnknownValidator(validator_index))? .ok_or(BeaconStateError::UnknownValidator(validator_index as usize))?
.update(&status); .update(&status);
} }
} }

View File

@@ -32,8 +32,8 @@ pub fn translate_participation<E: EthSpec>(
for index in attesting_indices { for index in attesting_indices {
for flag_index in &participation_flag_indices { for flag_index in &participation_flag_indices {
epoch_participation epoch_participation
.get_mut(index) .get_mut(index as usize)
.ok_or(Error::UnknownValidator(index))? .ok_or(Error::UnknownValidator(index as usize))?
.add_flag(*flag_index)?; .add_flag(*flag_index)?;
} }
} }