diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 7ea48a2fa4..16820f08f3 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -69,9 +69,12 @@ use slog::{crit, debug, error, info, trace, warn, Logger}; use slot_clock::SlotClock; use ssz::Encode; use state_processing::{ - common::get_indexed_attestation, + common::{get_attesting_indices, get_indexed_attestation}, per_block_processing, - per_block_processing::errors::AttestationValidationError, + per_block_processing::{ + errors::AttestationValidationError, verify_attestation_for_block_inclusion, + VerifySignatures, + }, per_slot_processing, state_advance::{complete_state_advance, partial_state_advance}, BlockSignatureStrategy, SigVerifiedOp, VerifyBlockRoot, @@ -3276,8 +3279,17 @@ impl BeaconChain { let unagg_import_timer = metrics::start_timer(&metrics::BLOCK_PRODUCTION_UNAGGREGATED_TIMES); for attestation in self.naive_aggregation_pool.read().iter() { - // FIXME(sproul): put correct attesting indices - if let Err(e) = self.op_pool.insert_attestation(attestation.clone(), vec![]) { + let import = |attestation: &Attestation| { + let committee = + state.get_beacon_committee(attestation.data.slot, attestation.data.index)?; + let attesting_indices = get_attesting_indices::( + committee.committee, + &attestation.aggregation_bits, + )?; + self.op_pool + .insert_attestation(attestation.clone(), attesting_indices) + }; + if let Err(e) = import(attestation) { // Don't stop block production if there's an error, just create a log. error!( self.log, @@ -3306,7 +3318,7 @@ impl BeaconChain { self.filter_op_pool_attestation(&mut curr_filter_cache, att, &state) }; - let attestations = self + let mut attestations = self .op_pool .get_attestations( &state, @@ -3315,6 +3327,33 @@ impl BeaconChain { &self.spec, ) .map_err(BlockProductionError::OpPoolError)?; + + let paranoid = false; + let ultra_paranoid = true; + + if paranoid { + let verify_sigs = if ultra_paranoid { + VerifySignatures::True + } else { + VerifySignatures::False + }; + attestations.retain(|att| { + let res = + verify_attestation_for_block_inclusion(&state, att, verify_sigs, &self.spec); + if let Err(e) = res { + error!( + self.log, + "Attempted to include an invalid attestation"; + "err" => ?e, + "block_slot" => state.slot(), + ); + assert!(false); + false + } else { + true + } + }); + } drop(attestation_packing_timer); let slot = state.slot(); diff --git a/beacon_node/operation_pool/src/attestation_storage.rs b/beacon_node/operation_pool/src/attestation_storage.rs index 247645e810..7bda1445a9 100644 --- a/beacon_node/operation_pool/src/attestation_storage.rs +++ b/beacon_node/operation_pool/src/attestation_storage.rs @@ -1,3 +1,4 @@ +use itertools::Itertools; use std::collections::HashMap; use types::{ AggregateSignature, Attestation, AttestationData, BeaconState, BitList, Checkpoint, Epoch, @@ -18,7 +19,7 @@ pub struct CompactAttestationData { pub target_root: Hash256, } -#[derive(Debug)] +#[derive(Debug, PartialEq)] pub struct CompactIndexedAttestation { pub attesting_indices: Vec, pub aggregation_bits: BitList, @@ -107,6 +108,24 @@ impl CheckpointKey { } } +impl CompactIndexedAttestation { + pub fn signers_disjoint_from(&self, other: &Self) -> bool { + self.aggregation_bits + .intersection(&other.aggregation_bits) + .is_zero() + } + + pub fn aggregate(&mut self, other: &Self) { + self.attesting_indices = self + .attesting_indices + .drain(..) + .merge(other.attesting_indices.iter().copied()) + .collect(); + self.aggregation_bits = self.aggregation_bits.union(&other.aggregation_bits); + self.signature.add_assign_aggregate(&other.signature); + } +} + impl AttestationMap { pub fn insert(&mut self, attestation: Attestation, attesting_indices: Vec) { let (checkpoint_key, attestation_data, indexed_attestation) = @@ -121,31 +140,22 @@ impl AttestationMap { .entry(attestation_data) .or_insert_with(Vec::new); - // FIXME(sproul): do greedy aggregation here - /* - let existing_attestations = match attestations.entry(id) { - Entry::Vacant(entry) => { - entry.insert(vec![attestation]); - return Ok(()); - } - Entry::Occupied(entry) => entry.into_mut(), - }; - + // Greedily aggregate the attestation with all existing attestations. + // NOTE: this is sub-optimal and in future we will remove this in favour of max-clique + // aggregation. let mut aggregated = false; - for existing_attestation in existing_attestations.iter_mut() { - if existing_attestation.signers_disjoint_from(&attestation) { - existing_attestation.aggregate(&attestation); + for existing_attestation in attestations.iter_mut() { + if existing_attestation.signers_disjoint_from(&indexed_attestation) { + existing_attestation.aggregate(&indexed_attestation); aggregated = true; - } else if *existing_attestation == attestation { + } else if *existing_attestation == indexed_attestation { aggregated = true; } } if !aggregated { - existing_attestations.push(attestation); + attestations.push(indexed_attestation); } - */ - attestations.push(indexed_attestation); } pub fn get_attestations<'a>( @@ -154,7 +164,7 @@ impl AttestationMap { ) -> impl Iterator> + 'a { // It's a monad :O self.checkpoint_map - .get(&checkpoint_key) + .get(checkpoint_key) .into_iter() .flat_map(|attestation_map| { attestation_map diff --git a/beacon_node/operation_pool/src/lib.rs b/beacon_node/operation_pool/src/lib.rs index 76f9b2e872..4e180db6f4 100644 --- a/beacon_node/operation_pool/src/lib.rs +++ b/beacon_node/operation_pool/src/lib.rs @@ -16,7 +16,6 @@ pub use reward_cache::RewardCache; use crate::attestation_storage::{AttestationMap, CheckpointKey}; use crate::sync_aggregate_id::SyncAggregateId; -use attestation_id::AttestationId; use attester_slashing::AttesterSlashingMaxCover; use max_cover::maximum_cover; use parking_lot::{RwLock, RwLockWriteGuard}; @@ -234,9 +233,12 @@ impl OperationPool { validity_filter: impl FnMut(&AttestationRef<'a, T>) -> bool + Send, spec: &'a ChainSpec, ) -> impl Iterator> + Send { - // FIXME(sproul): check inclusion slot somewhere all_attestations .get_attestations(checkpoint_key) + .filter(|att| { + att.data.slot + spec.min_attestation_inclusion_delay <= state.slot() + && state.slot() <= att.data.slot + T::slots_per_epoch() + }) .filter(validity_filter) .filter_map(move |att| { AttMaxCover::new(att, state, reward_cache, total_active_balance, spec) diff --git a/beacon_node/operation_pool/src/reward_cache.rs b/beacon_node/operation_pool/src/reward_cache.rs index 2d2d7d9d88..b278cb14f0 100644 --- a/beacon_node/operation_pool/src/reward_cache.rs +++ b/beacon_node/operation_pool/src/reward_cache.rs @@ -1,6 +1,5 @@ use crate::OpPoolError; use bitvec::vec::BitVec; -use std::collections::HashMap; use types::{BeaconState, BeaconStateError, Epoch, EthSpec, Hash256, ParticipationFlags}; #[derive(Debug, Clone)] diff --git a/consensus/state_processing/src/common/get_attesting_indices.rs b/consensus/state_processing/src/common/get_attesting_indices.rs index fb636f861e..3367b8b064 100644 --- a/consensus/state_processing/src/common/get_attesting_indices.rs +++ b/consensus/state_processing/src/common/get_attesting_indices.rs @@ -6,7 +6,7 @@ use types::*; pub fn get_attesting_indices( committee: &[usize], bitlist: &BitList, -) -> Result, BeaconStateError> { +) -> Result, BeaconStateError> { if bitlist.len() != committee.len() { return Err(BeaconStateError::InvalidBitfield); } @@ -15,7 +15,7 @@ pub fn get_attesting_indices( for (i, validator_index) in committee.iter().enumerate() { if let Ok(true) = bitlist.get(i) { - indices.push(*validator_index) + indices.push(*validator_index as u64) } } diff --git a/consensus/state_processing/src/common/get_indexed_attestation.rs b/consensus/state_processing/src/common/get_indexed_attestation.rs index daa1c09307..63f63698e4 100644 --- a/consensus/state_processing/src/common/get_indexed_attestation.rs +++ b/consensus/state_processing/src/common/get_indexed_attestation.rs @@ -14,9 +14,7 @@ pub fn get_indexed_attestation( let attesting_indices = get_attesting_indices::(committee, &attestation.aggregation_bits)?; Ok(IndexedAttestation { - attesting_indices: VariableList::new( - attesting_indices.into_iter().map(|x| x as u64).collect(), - )?, + attesting_indices: VariableList::new(attesting_indices)?, data: attestation.data.clone(), signature: attestation.signature.clone(), }) diff --git a/consensus/state_processing/src/per_epoch_processing/base/validator_statuses.rs b/consensus/state_processing/src/per_epoch_processing/base/validator_statuses.rs index b40f91ce5a..26d2536e5f 100644 --- a/consensus/state_processing/src/per_epoch_processing/base/validator_statuses.rs +++ b/consensus/state_processing/src/per_epoch_processing/base/validator_statuses.rs @@ -278,8 +278,8 @@ impl ValidatorStatuses { // Loop through the participating validator indices and update the status vec. for validator_index in attesting_indices { self.statuses - .get_mut(validator_index) - .ok_or(BeaconStateError::UnknownValidator(validator_index))? + .get_mut(validator_index as usize) + .ok_or(BeaconStateError::UnknownValidator(validator_index as usize))? .update(&status); } } diff --git a/consensus/state_processing/src/upgrade/altair.rs b/consensus/state_processing/src/upgrade/altair.rs index 5e4fcbcf55..176f1af15c 100644 --- a/consensus/state_processing/src/upgrade/altair.rs +++ b/consensus/state_processing/src/upgrade/altair.rs @@ -32,8 +32,8 @@ pub fn translate_participation( for index in attesting_indices { for flag_index in &participation_flag_indices { epoch_participation - .get_mut(index) - .ok_or(Error::UnknownValidator(index))? + .get_mut(index as usize) + .ok_or(Error::UnknownValidator(index as usize))? .add_flag(*flag_index)?; } }