diff --git a/Cargo.lock b/Cargo.lock index 5a2c4312b1..46cd2d96f0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4410,12 +4410,14 @@ name = "operation_pool" version = "0.2.0" dependencies = [ "beacon_chain", + "bitvec 1.0.1", "derivative", "eth2_ssz", "eth2_ssz_derive", "itertools", "lazy_static", "lighthouse_metrics", + "maplit", "parking_lot 0.12.1", "rayon", "serde", @@ -6271,9 +6273,11 @@ dependencies = [ "arbitrary", "beacon_chain", "bls", + "derivative", "env_logger 0.9.0", "eth2_hashing", "eth2_ssz", + "eth2_ssz_derive", "eth2_ssz_types", "int_to_bytes", "integer-sqrt", diff --git a/beacon_node/beacon_chain/src/attestation_verification.rs b/beacon_node/beacon_chain/src/attestation_verification.rs index 63af6ab9e1..b60ce7efe5 100644 --- a/beacon_node/beacon_chain/src/attestation_verification.rs +++ b/beacon_node/beacon_chain/src/attestation_verification.rs @@ -318,10 +318,17 @@ impl<'a, T: BeaconChainTypes> Clone for IndexedUnaggregatedAttestation<'a, T> { /// A helper trait implemented on wrapper types that can be progressed to a state where they can be /// verified for application to fork choice. -pub trait VerifiedAttestation { +pub trait VerifiedAttestation: Sized { fn attestation(&self) -> &Attestation; fn indexed_attestation(&self) -> &IndexedAttestation; + + // Inefficient default implementation. This is overridden for gossip verified attestations. + fn into_attestation_and_indices(self) -> (Attestation, Vec) { + let attestation = self.attestation().clone(); + let attesting_indices = self.indexed_attestation().attesting_indices.clone().into(); + (attestation, attesting_indices) + } } impl<'a, T: BeaconChainTypes> VerifiedAttestation for VerifiedAggregatedAttestation<'a, T> { diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 4d37926dd9..fdcd3eed88 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -63,7 +63,7 @@ use fork_choice::{ use futures::channel::mpsc::Sender; use itertools::process_results; use itertools::Itertools; -use operation_pool::{OperationPool, PersistedOperationPool}; +use operation_pool::{AttestationRef, OperationPool, PersistedOperationPool}; use parking_lot::{Mutex, RwLock}; use safe_arith::SafeArith; use slasher::Slasher; @@ -71,12 +71,15 @@ use slog::{crit, debug, error, info, trace, warn, Logger}; use slot_clock::SlotClock; use ssz::Encode; use state_processing::{ - common::get_indexed_attestation, + common::{get_attesting_indices_from_state, get_indexed_attestation}, per_block_processing, - per_block_processing::errors::AttestationValidationError, + per_block_processing::{ + errors::AttestationValidationError, verify_attestation_for_block_inclusion, + VerifySignatures, + }, per_slot_processing, state_advance::{complete_state_advance, partial_state_advance}, - BlockSignatureStrategy, SigVerifiedOp, VerifyBlockRoot, + BlockSignatureStrategy, SigVerifiedOp, VerifyBlockRoot, VerifyOperation, }; use std::cmp::Ordering; use std::collections::HashMap; @@ -1904,25 +1907,22 @@ impl BeaconChain { /// Accepts a `VerifiedAttestation` and attempts to apply it to `self.op_pool`. /// /// The op pool is used by local block producers to pack blocks with operations. - pub fn add_to_block_inclusion_pool( + pub fn add_to_block_inclusion_pool( &self, - verified_attestation: &impl VerifiedAttestation, - ) -> Result<(), AttestationError> { + verified_attestation: A, + ) -> Result<(), AttestationError> + where + A: VerifiedAttestation, + { let _timer = metrics::start_timer(&metrics::ATTESTATION_PROCESSING_APPLY_TO_OP_POOL); // If there's no eth1 chain then it's impossible to produce blocks and therefore // useless to put things in the op pool. if self.eth1_chain.is_some() { - let fork = self.canonical_head.cached_head().head_fork(); - + let (attestation, attesting_indices) = + verified_attestation.into_attestation_and_indices(); self.op_pool - .insert_attestation( - // TODO: address this clone. - verified_attestation.attestation().clone(), - &fork, - self.genesis_validators_root, - &self.spec, - ) + .insert_attestation(attestation, attesting_indices) .map_err(Error::from)?; } @@ -1955,15 +1955,15 @@ impl BeaconChain { pub fn filter_op_pool_attestation( &self, filter_cache: &mut HashMap<(Hash256, Epoch), bool>, - att: &Attestation, + att: &AttestationRef, state: &BeaconState, ) -> bool { *filter_cache - .entry((att.data.beacon_block_root, att.data.target.epoch)) + .entry((att.data.beacon_block_root, att.checkpoint.target_epoch)) .or_insert_with(|| { self.shuffling_is_compatible( &att.data.beacon_block_root, - att.data.target.epoch, + att.checkpoint.target_epoch, state, ) }) @@ -2045,7 +2045,7 @@ impl BeaconChain { pub fn verify_voluntary_exit_for_gossip( &self, exit: SignedVoluntaryExit, - ) -> Result, Error> { + ) -> Result, Error> { // NOTE: this could be more efficient if it avoided cloning the head state let wall_clock_state = self.wall_clock_state()?; Ok(self @@ -2066,7 +2066,7 @@ impl BeaconChain { } /// Accept a pre-verified exit and queue it for inclusion in an appropriate block. - pub fn import_voluntary_exit(&self, exit: SigVerifiedOp) { + pub fn import_voluntary_exit(&self, exit: SigVerifiedOp) { if self.eth1_chain.is_some() { self.op_pool.insert_voluntary_exit(exit) } @@ -2076,7 +2076,7 @@ impl BeaconChain { pub fn verify_proposer_slashing_for_gossip( &self, proposer_slashing: ProposerSlashing, - ) -> Result, Error> { + ) -> Result, Error> { let wall_clock_state = self.wall_clock_state()?; Ok(self.observed_proposer_slashings.lock().verify_and_observe( proposer_slashing, @@ -2086,7 +2086,10 @@ impl BeaconChain { } /// Accept some proposer slashing and queue it for inclusion in an appropriate block. - pub fn import_proposer_slashing(&self, proposer_slashing: SigVerifiedOp) { + pub fn import_proposer_slashing( + &self, + proposer_slashing: SigVerifiedOp, + ) { if self.eth1_chain.is_some() { self.op_pool.insert_proposer_slashing(proposer_slashing) } @@ -2096,7 +2099,7 @@ impl BeaconChain { pub fn verify_attester_slashing_for_gossip( &self, attester_slashing: AttesterSlashing, - ) -> Result>, Error> { + ) -> Result, T::EthSpec>, Error> { let wall_clock_state = self.wall_clock_state()?; Ok(self.observed_attester_slashings.lock().verify_and_observe( attester_slashing, @@ -2111,7 +2114,7 @@ impl BeaconChain { /// 2. Add it to the op pool. pub fn import_attester_slashing( &self, - attester_slashing: SigVerifiedOp>, + attester_slashing: SigVerifiedOp, T::EthSpec>, ) { // Add to fork choice. self.canonical_head @@ -2120,10 +2123,7 @@ impl BeaconChain { // Add to the op pool (if we have the ability to propose blocks). if self.eth1_chain.is_some() { - self.op_pool.insert_attester_slashing( - attester_slashing, - self.canonical_head.cached_head().head_fork(), - ) + self.op_pool.insert_attester_slashing(attester_slashing) } } @@ -3351,7 +3351,7 @@ impl BeaconChain { } }; - let (proposer_slashings, attester_slashings, voluntary_exits) = + let (mut proposer_slashings, mut attester_slashings, mut voluntary_exits) = self.op_pool.get_slashings_and_exits(&state, &self.spec); let eth1_data = eth1_chain.eth1_data_for_block_production(&state, &self.spec)?; @@ -3362,12 +3362,12 @@ impl BeaconChain { let unagg_import_timer = metrics::start_timer(&metrics::BLOCK_PRODUCTION_UNAGGREGATED_TIMES); for attestation in self.naive_aggregation_pool.read().iter() { - if let Err(e) = self.op_pool.insert_attestation( - attestation.clone(), - &state.fork(), - state.genesis_validators_root(), - &self.spec, - ) { + let import = |attestation: &Attestation| { + let attesting_indices = get_attesting_indices_from_state(&state, attestation)?; + self.op_pool + .insert_attestation(attestation.clone(), attesting_indices) + }; + if let Err(e) = import(attestation) { // Don't stop block production if there's an error, just create a log. error!( self.log, @@ -3388,15 +3388,15 @@ impl BeaconChain { metrics::start_timer(&metrics::BLOCK_PRODUCTION_ATTESTATION_TIMES); let mut prev_filter_cache = HashMap::new(); - let prev_attestation_filter = |att: &&Attestation| { - self.filter_op_pool_attestation(&mut prev_filter_cache, *att, &state) + let prev_attestation_filter = |att: &AttestationRef| { + self.filter_op_pool_attestation(&mut prev_filter_cache, att, &state) }; let mut curr_filter_cache = HashMap::new(); - let curr_attestation_filter = |att: &&Attestation| { - self.filter_op_pool_attestation(&mut curr_filter_cache, *att, &state) + let curr_attestation_filter = |att: &AttestationRef| { + self.filter_op_pool_attestation(&mut curr_filter_cache, att, &state) }; - let attestations = self + let mut attestations = self .op_pool .get_attestations( &state, @@ -3407,6 +3407,77 @@ impl BeaconChain { .map_err(BlockProductionError::OpPoolError)?; drop(attestation_packing_timer); + // If paranoid mode is enabled re-check the signatures of every included message. + // This will be a lot slower but guards against bugs in block production and can be + // quickly rolled out without a release. + if self.config.paranoid_block_proposal { + attestations.retain(|att| { + verify_attestation_for_block_inclusion( + &state, + att, + VerifySignatures::True, + &self.spec, + ) + .map_err(|e| { + warn!( + self.log, + "Attempted to include an invalid attestation"; + "err" => ?e, + "block_slot" => state.slot(), + "attestation" => ?att + ); + }) + .is_ok() + }); + + proposer_slashings.retain(|slashing| { + slashing + .clone() + .validate(&state, &self.spec) + .map_err(|e| { + warn!( + self.log, + "Attempted to include an invalid proposer slashing"; + "err" => ?e, + "block_slot" => state.slot(), + "slashing" => ?slashing + ); + }) + .is_ok() + }); + + attester_slashings.retain(|slashing| { + slashing + .clone() + .validate(&state, &self.spec) + .map_err(|e| { + warn!( + self.log, + "Attempted to include an invalid attester slashing"; + "err" => ?e, + "block_slot" => state.slot(), + "slashing" => ?slashing + ); + }) + .is_ok() + }); + + voluntary_exits.retain(|exit| { + exit.clone() + .validate(&state, &self.spec) + .map_err(|e| { + warn!( + self.log, + "Attempted to include an invalid proposer slashing"; + "err" => ?e, + "block_slot" => state.slot(), + "exit" => ?exit + ); + }) + .is_ok() + }); + } + let slot = state.slot(); let proposer_index = state.get_beacon_proposer_index(state.slot(), &self.spec)? as u64; diff --git a/beacon_node/beacon_chain/src/block_reward.rs b/beacon_node/beacon_chain/src/block_reward.rs index 4b8b809d3f..3bddd2a521 100644 --- a/beacon_node/beacon_chain/src/block_reward.rs +++ b/beacon_node/beacon_chain/src/block_reward.rs @@ -1,7 +1,10 @@ use crate::{BeaconChain, BeaconChainError, BeaconChainTypes}; use eth2::lighthouse::{AttestationRewards, BlockReward, BlockRewardMeta}; -use operation_pool::{AttMaxCover, MaxCover}; -use state_processing::per_block_processing::altair::sync_committee::compute_sync_aggregate_rewards; +use operation_pool::{AttMaxCover, MaxCover, RewardCache, SplitAttestation}; +use state_processing::{ + common::get_attesting_indices_from_state, + per_block_processing::altair::sync_committee::compute_sync_aggregate_rewards, +}; use types::{BeaconBlockRef, BeaconState, EthSpec, ExecPayload, Hash256}; impl BeaconChain { @@ -10,20 +13,38 @@ impl BeaconChain { block: BeaconBlockRef<'_, T::EthSpec, Payload>, block_root: Hash256, state: &BeaconState, + reward_cache: &mut RewardCache, include_attestations: bool, ) -> Result { if block.slot() != state.slot() { return Err(BeaconChainError::BlockRewardSlotError); } + reward_cache.update(state)?; + let total_active_balance = state.get_total_active_balance()?; - let mut per_attestation_rewards = block + + let split_attestations = block .body() .attestations() .iter() .map(|att| { - AttMaxCover::new(att, state, total_active_balance, &self.spec) - .ok_or(BeaconChainError::BlockRewardAttestationError) + let attesting_indices = get_attesting_indices_from_state(state, att)?; + Ok(SplitAttestation::new(att.clone(), attesting_indices)) + }) + .collect::, BeaconChainError>>()?; + + let mut per_attestation_rewards = split_attestations + .iter() + .map(|att| { + AttMaxCover::new( + att.as_ref(), + state, + reward_cache, + total_active_balance, + &self.spec, + ) + .ok_or(BeaconChainError::BlockRewardAttestationError) }) .collect::, _>>()?; @@ -34,7 +55,7 @@ impl BeaconChain { let latest_att = &updated[i]; for att in to_update { - att.update_covering_set(latest_att.object(), latest_att.covering_set()); + att.update_covering_set(latest_att.intermediate(), latest_att.covering_set()); } } diff --git a/beacon_node/beacon_chain/src/block_verification.rs b/beacon_node/beacon_chain/src/block_verification.rs index 95d5f818f0..cdcbf3f68e 100644 --- a/beacon_node/beacon_chain/src/block_verification.rs +++ b/beacon_node/beacon_chain/src/block_verification.rs @@ -1307,8 +1307,14 @@ impl ExecutionPendingBlock { */ if let Some(ref event_handler) = chain.event_handler { if event_handler.has_block_reward_subscribers() { - let block_reward = - chain.compute_block_reward(block.message(), block_root, &state, true)?; + let mut reward_cache = Default::default(); + let block_reward = chain.compute_block_reward( + block.message(), + block_root, + &state, + &mut reward_cache, + true, + )?; event_handler.register(EventKind::BlockReward(block_reward)); } } diff --git a/beacon_node/beacon_chain/src/chain_config.rs b/beacon_node/beacon_chain/src/chain_config.rs index aa7ff02af1..ba3a0b628c 100644 --- a/beacon_node/beacon_chain/src/chain_config.rs +++ b/beacon_node/beacon_chain/src/chain_config.rs @@ -35,6 +35,8 @@ pub struct ChainConfig { /// Whether any chain health checks should be considered when deciding whether to use the builder API. pub builder_fallback_disable_checks: bool, pub count_unrealized: bool, + /// Whether to apply paranoid checks to blocks proposed by this beacon node. + pub paranoid_block_proposal: bool, } impl Default for ChainConfig { @@ -52,6 +54,7 @@ impl Default for ChainConfig { builder_fallback_epochs_since_finalization: 3, builder_fallback_disable_checks: false, count_unrealized: true, + paranoid_block_proposal: false, } } } diff --git a/beacon_node/beacon_chain/src/observed_operations.rs b/beacon_node/beacon_chain/src/observed_operations.rs index f1eb996a54..8d8272b67d 100644 --- a/beacon_node/beacon_chain/src/observed_operations.rs +++ b/beacon_node/beacon_chain/src/observed_operations.rs @@ -1,10 +1,12 @@ use derivative::Derivative; use smallvec::SmallVec; +use ssz::{Decode, Encode}; use state_processing::{SigVerifiedOp, VerifyOperation}; use std::collections::HashSet; use std::marker::PhantomData; use types::{ - AttesterSlashing, BeaconState, ChainSpec, EthSpec, ProposerSlashing, SignedVoluntaryExit, + AttesterSlashing, BeaconState, ChainSpec, EthSpec, ForkName, ProposerSlashing, + SignedVoluntaryExit, Slot, }; /// Number of validator indices to store on the stack in `observed_validators`. @@ -24,13 +26,16 @@ pub struct ObservedOperations, E: EthSpec> { /// previously seen attester slashings, i.e. those validators in the intersection of /// `attestation_1.attester_indices` and `attestation_2.attester_indices`. observed_validator_indices: HashSet, + /// The name of the current fork. The default will be overwritten on first use. + #[derivative(Default(value = "ForkName::Base"))] + current_fork: ForkName, _phantom: PhantomData<(T, E)>, } /// Was the observed operation new and valid for further processing, or a useless duplicate? #[derive(Debug, PartialEq, Eq, Clone)] -pub enum ObservationOutcome { - New(SigVerifiedOp), +pub enum ObservationOutcome { + New(SigVerifiedOp), AlreadyKnown, } @@ -81,7 +86,9 @@ impl, E: EthSpec> ObservedOperations { op: T, head_state: &BeaconState, spec: &ChainSpec, - ) -> Result, T::Error> { + ) -> Result, T::Error> { + self.reset_at_fork_boundary(head_state.slot(), spec); + let observed_validator_indices = &mut self.observed_validator_indices; let new_validator_indices = op.observed_validators(); @@ -107,4 +114,23 @@ impl, E: EthSpec> ObservedOperations { Ok(ObservationOutcome::New(verified_op)) } + + /// Reset the cache when crossing a fork boundary. + /// + /// This prevents an attacker from crafting a self-slashing which is only valid before the fork + /// (e.g. using the Altair fork domain at a Bellatrix epoch), in order to prevent propagation of + /// all other slashings due to the duplicate check. + /// + /// It doesn't matter if this cache gets reset too often, as we reset it on restart anyway and a + /// false negative just results in propagation of messages which should have been ignored. + /// + /// In future we could check slashing relevance against the op pool itself, but that would + /// require indexing the attester slashings in the op pool by validator index. + fn reset_at_fork_boundary(&mut self, head_slot: Slot, spec: &ChainSpec) { + let head_fork = spec.fork_name_at_slot::(head_slot); + if head_fork != self.current_fork { + self.observed_validator_indices.clear(); + self.current_fork = head_fork; + } + } } diff --git a/beacon_node/beacon_chain/src/schema_change.rs b/beacon_node/beacon_chain/src/schema_change.rs index b6c70b5435..15b0f39f3a 100644 --- a/beacon_node/beacon_chain/src/schema_change.rs +++ b/beacon_node/beacon_chain/src/schema_change.rs @@ -1,6 +1,7 @@ //! Utilities for managing database schema changes. mod migration_schema_v10; mod migration_schema_v11; +mod migration_schema_v12; mod migration_schema_v6; mod migration_schema_v7; mod migration_schema_v8; @@ -196,6 +197,16 @@ pub fn migrate_schema( Ok(()) } + // Upgrade from v11 to v12 to store richer metadata in the attestation op pool. + (SchemaVersion(11), SchemaVersion(12)) => { + let ops = migration_schema_v12::upgrade_to_v12::(db.clone(), log)?; + db.store_schema_version_atomically(to, ops) + } + // Downgrade from v12 to v11 to drop richer metadata from the attestation op pool. + (SchemaVersion(12), SchemaVersion(11)) => { + let ops = migration_schema_v12::downgrade_from_v12::(db.clone(), log)?; + db.store_schema_version_atomically(to, ops) + } // Anything else is an error. (_, _) => Err(HotColdDBError::UnsupportedSchemaVersion { target_version: to, diff --git a/beacon_node/beacon_chain/src/schema_change/migration_schema_v12.rs b/beacon_node/beacon_chain/src/schema_change/migration_schema_v12.rs new file mode 100644 index 0000000000..bb72b28c0e --- /dev/null +++ b/beacon_node/beacon_chain/src/schema_change/migration_schema_v12.rs @@ -0,0 +1,226 @@ +use crate::beacon_chain::{BeaconChainTypes, FORK_CHOICE_DB_KEY, OP_POOL_DB_KEY}; +use crate::persisted_fork_choice::PersistedForkChoiceV11; +use operation_pool::{PersistedOperationPool, PersistedOperationPoolV12, PersistedOperationPoolV5}; +use slog::{debug, info, Logger}; +use state_processing::{ + common::get_indexed_attestation, per_block_processing::is_valid_indexed_attestation, + VerifyOperation, VerifySignatures, +}; +use std::sync::Arc; +use store::{Error, HotColdDB, KeyValueStoreOp, StoreItem}; + +pub fn upgrade_to_v12( + db: Arc>, + log: Logger, +) -> Result, Error> { + let spec = db.get_chain_spec(); + + // Load a V5 op pool and transform it to V12. + let PersistedOperationPoolV5 { + attestations_v5, + sync_contributions, + attester_slashings_v5, + proposer_slashings_v5, + voluntary_exits_v5, + } = if let Some(op_pool) = db.get_item(&OP_POOL_DB_KEY)? { + op_pool + } else { + debug!(log, "Nothing to do, no operation pool stored"); + return Ok(vec![]); + }; + + // Load the persisted fork choice so we can grab the state of the justified block and use + // it to verify the stored attestations, slashings and exits. + let fork_choice = db + .get_item::(&FORK_CHOICE_DB_KEY)? + .ok_or_else(|| Error::SchemaMigrationError("fork choice missing from database".into()))?; + let justified_block_root = fork_choice + .fork_choice_store + .unrealized_justified_checkpoint + .root; + let justified_block = db + .get_blinded_block(&justified_block_root)? + .ok_or_else(|| { + Error::SchemaMigrationError(format!( + "unrealized justified block missing for migration: {justified_block_root:?}", + )) + })?; + let justified_state_root = justified_block.state_root(); + let mut state = db + .get_state(&justified_state_root, Some(justified_block.slot()))? + .ok_or_else(|| { + Error::SchemaMigrationError(format!( + "justified state missing for migration: {justified_state_root:?}" + )) + })?; + state.build_all_committee_caches(spec).map_err(|e| { + Error::SchemaMigrationError(format!("unable to build committee caches: {e:?}")) + })?; + + // Re-verify attestations while adding attesting indices. + let attestations = attestations_v5 + .into_iter() + .flat_map(|(_, attestations)| attestations) + .filter_map(|attestation| { + let res = state + .get_beacon_committee(attestation.data.slot, attestation.data.index) + .map_err(Into::into) + .and_then(|committee| get_indexed_attestation(committee.committee, &attestation)) + .and_then(|indexed_attestation| { + is_valid_indexed_attestation( + &state, + &indexed_attestation, + VerifySignatures::True, + spec, + )?; + Ok(indexed_attestation) + }); + + match res { + Ok(indexed) => Some((attestation, indexed.attesting_indices.into())), + Err(e) => { + debug!( + log, + "Dropping attestation on migration"; + "err" => ?e, + "head_block" => ?attestation.data.beacon_block_root, + ); + None + } + } + }) + .collect::>(); + + let attester_slashings = attester_slashings_v5 + .iter() + .filter_map(|(slashing, _)| { + slashing + .clone() + .validate(&state, spec) + .map_err(|e| { + debug!( + log, + "Dropping attester slashing on migration"; + "err" => ?e, + "slashing" => ?slashing, + ); + }) + .ok() + }) + .collect::>(); + + let proposer_slashings = proposer_slashings_v5 + .iter() + .filter_map(|slashing| { + slashing + .clone() + .validate(&state, spec) + .map_err(|e| { + debug!( + log, + "Dropping proposer slashing on migration"; + "err" => ?e, + "slashing" => ?slashing, + ); + }) + .ok() + }) + .collect::>(); + + let voluntary_exits = voluntary_exits_v5 + .iter() + .filter_map(|exit| { + exit.clone() + .validate(&state, spec) + .map_err(|e| { + debug!( + log, + "Dropping voluntary exit on migration"; + "err" => ?e, + "exit" => ?exit, + ); + }) + .ok() + }) + .collect::>(); + + debug!( + log, + "Migrated op pool"; + "attestations" => attestations.len(), + "attester_slashings" => attester_slashings.len(), + "proposer_slashings" => proposer_slashings.len(), + "voluntary_exits" => voluntary_exits.len() + ); + + let v12 = PersistedOperationPool::V12(PersistedOperationPoolV12 { + attestations, + sync_contributions, + attester_slashings, + proposer_slashings, + voluntary_exits, + }); + Ok(vec![v12.as_kv_store_op(OP_POOL_DB_KEY)]) +} + +pub fn downgrade_from_v12( + db: Arc>, + log: Logger, +) -> Result, Error> { + // Load a V12 op pool and transform it to V5. + let PersistedOperationPoolV12 { + attestations, + sync_contributions, + attester_slashings, + proposer_slashings, + voluntary_exits, + } = if let Some(PersistedOperationPool::::V12(op_pool)) = + db.get_item(&OP_POOL_DB_KEY)? + { + op_pool + } else { + debug!(log, "Nothing to do, no operation pool stored"); + return Ok(vec![]); + }; + + info!( + log, + "Dropping attestations from pool"; + "count" => attestations.len(), + ); + + let attester_slashings_v5 = attester_slashings + .into_iter() + .filter_map(|slashing| { + let fork_version = slashing.first_fork_verified_against()?; + Some((slashing.into_inner(), fork_version)) + }) + .collect::>(); + + let proposer_slashings_v5 = proposer_slashings + .into_iter() + .map(|slashing| slashing.into_inner()) + .collect::>(); + + let voluntary_exits_v5 = voluntary_exits + .into_iter() + .map(|exit| exit.into_inner()) + .collect::>(); + + info!( + log, + "Migrated slashings and exits"; + "attester_slashings" => attester_slashings_v5.len(), + "proposer_slashings" => proposer_slashings_v5.len(), + "voluntary_exits" => voluntary_exits_v5.len(), + ); + + let v5 = PersistedOperationPoolV5 { + attestations_v5: vec![], + sync_contributions, + attester_slashings_v5, + proposer_slashings_v5, + voluntary_exits_v5, + }; + Ok(vec![v5.as_kv_store_op(OP_POOL_DB_KEY)]) +} diff --git a/beacon_node/beacon_chain/src/test_utils.rs b/beacon_node/beacon_chain/src/test_utils.rs index 9b62590703..a62608202e 100644 --- a/beacon_node/beacon_chain/src/test_utils.rs +++ b/beacon_node/beacon_chain/src/test_utils.rs @@ -1175,6 +1175,19 @@ where } pub fn make_attester_slashing(&self, validator_indices: Vec) -> AttesterSlashing { + self.make_attester_slashing_with_epochs(validator_indices, None, None, None, None) + } + + pub fn make_attester_slashing_with_epochs( + &self, + validator_indices: Vec, + source1: Option, + target1: Option, + source2: Option, + target2: Option, + ) -> AttesterSlashing { + let fork = self.chain.canonical_head.cached_head().head_fork(); + let mut attestation_1 = IndexedAttestation { attesting_indices: VariableList::new(validator_indices).unwrap(), data: AttestationData { @@ -1183,11 +1196,11 @@ where beacon_block_root: Hash256::zero(), target: Checkpoint { root: Hash256::zero(), - epoch: Epoch::new(0), + epoch: target1.unwrap_or(fork.epoch), }, source: Checkpoint { root: Hash256::zero(), - epoch: Epoch::new(0), + epoch: source1.unwrap_or(Epoch::new(0)), }, }, signature: AggregateSignature::infinity(), @@ -1195,8 +1208,9 @@ where let mut attestation_2 = attestation_1.clone(); attestation_2.data.index += 1; + attestation_2.data.source.epoch = source2.unwrap_or(Epoch::new(0)); + attestation_2.data.target.epoch = target2.unwrap_or(fork.epoch); - let fork = self.chain.canonical_head.cached_head().head_fork(); for attestation in &mut [&mut attestation_1, &mut attestation_2] { for &i in &attestation.attesting_indices { let sk = &self.validator_keypairs[i as usize].sk; @@ -1280,8 +1294,19 @@ where } pub fn make_proposer_slashing(&self, validator_index: u64) -> ProposerSlashing { + self.make_proposer_slashing_at_slot(validator_index, None) + } + + pub fn make_proposer_slashing_at_slot( + &self, + validator_index: u64, + slot_override: Option, + ) -> ProposerSlashing { let mut block_header_1 = self.chain.head_beacon_block().message().block_header(); block_header_1.proposer_index = validator_index; + if let Some(slot) = slot_override { + block_header_1.slot = slot; + } let mut block_header_2 = block_header_1.clone(); block_header_2.state_root = Hash256::zero(); @@ -1488,7 +1513,7 @@ where self.chain .apply_attestation_to_fork_choice(&verified) .unwrap(); - self.chain.add_to_block_inclusion_pool(&verified).unwrap(); + self.chain.add_to_block_inclusion_pool(verified).unwrap(); } } diff --git a/beacon_node/beacon_chain/tests/store_tests.rs b/beacon_node/beacon_chain/tests/store_tests.rs index d9d5ca20d7..afd97750a6 100644 --- a/beacon_node/beacon_chain/tests/store_tests.rs +++ b/beacon_node/beacon_chain/tests/store_tests.rs @@ -795,9 +795,7 @@ async fn multiple_attestations_per_block() { snapshot .beacon_block .as_ref() - .clone() - .deconstruct() - .0 + .message() .body() .attestations() .len() as u64, diff --git a/beacon_node/http_api/src/block_rewards.rs b/beacon_node/http_api/src/block_rewards.rs index 682828aee4..3b81b894db 100644 --- a/beacon_node/http_api/src/block_rewards.rs +++ b/beacon_node/http_api/src/block_rewards.rs @@ -52,6 +52,7 @@ pub fn get_block_rewards( .build_all_caches(&chain.spec) .map_err(beacon_state_error)?; + let mut reward_cache = Default::default(); let mut block_rewards = Vec::with_capacity(blocks.len()); let block_replayer = BlockReplayer::new(state, &chain.spec) @@ -63,6 +64,7 @@ pub fn get_block_rewards( block.message(), block.canonical_root(), state, + &mut reward_cache, query.include_attestations, )?; block_rewards.push(block_reward); @@ -100,6 +102,7 @@ pub fn compute_block_rewards( ) -> Result, warp::Rejection> { let mut block_rewards = Vec::with_capacity(blocks.len()); let mut state_cache = LruCache::new(STATE_CACHE_SIZE); + let mut reward_cache = Default::default(); for block in blocks { let parent_root = block.parent_root(); @@ -170,7 +173,13 @@ pub fn compute_block_rewards( // Compute block reward. let block_reward = chain - .compute_block_reward(block.to_ref(), block.canonical_root(), state, true) + .compute_block_reward( + block.to_ref(), + block.canonical_root(), + state, + &mut reward_cache, + true, + ) .map_err(beacon_chain_error)?; block_rewards.push(block_reward); } diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index 59e6554aee..48178f4f0d 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -45,11 +45,12 @@ use std::sync::Arc; use tokio::sync::mpsc::UnboundedSender; use tokio_stream::{wrappers::BroadcastStream, StreamExt}; use types::{ - Attestation, AttesterSlashing, BeaconStateError, BlindedPayload, CommitteeCache, - ConfigAndPreset, Epoch, EthSpec, ForkName, FullPayload, ProposerPreparationData, - ProposerSlashing, RelativeEpoch, Signature, SignedAggregateAndProof, SignedBeaconBlock, - SignedBlindedBeaconBlock, SignedContributionAndProof, SignedValidatorRegistrationData, - SignedVoluntaryExit, Slot, SyncCommitteeMessage, SyncContributionData, + Attestation, AttestationData, AttesterSlashing, BeaconStateError, BlindedPayload, + CommitteeCache, ConfigAndPreset, Epoch, EthSpec, ForkName, FullPayload, + ProposerPreparationData, ProposerSlashing, RelativeEpoch, Signature, SignedAggregateAndProof, + SignedBeaconBlock, SignedBlindedBeaconBlock, SignedContributionAndProof, + SignedValidatorRegistrationData, SignedVoluntaryExit, Slot, SyncCommitteeMessage, + SyncContributionData, }; use version::{ add_consensus_version_header, execution_optimistic_fork_versioned_response, @@ -1305,13 +1306,11 @@ pub fn serve( .and_then( |chain: Arc>, query: api_types::AttestationPoolQuery| { blocking_json_task(move || { - let query_filter = |attestation: &Attestation| { - query - .slot - .map_or(true, |slot| slot == attestation.data.slot) + let query_filter = |data: &AttestationData| { + query.slot.map_or(true, |slot| slot == data.slot) && query .committee_index - .map_or(true, |index| index == attestation.data.index) + .map_or(true, |index| index == data.index) }; let mut attestations = chain.op_pool.get_filtered_attestations(query_filter); @@ -1321,7 +1320,7 @@ pub fn serve( .read() .iter() .cloned() - .filter(query_filter), + .filter(|att| query_filter(&att.data)), ); Ok(api_types::GenericResponse::from(attestations)) }) @@ -2317,12 +2316,13 @@ pub fn serve( ); failures.push(api_types::Failure::new(index, format!("Fork choice: {:?}", e))); } - if let Err(e) = chain.add_to_block_inclusion_pool(&verified_aggregate) { - warn!(log, - "Could not add verified aggregate attestation to the inclusion pool"; - "error" => format!("{:?}", e), - "request_index" => index, - ); + if let Err(e) = chain.add_to_block_inclusion_pool(verified_aggregate) { + warn!( + log, + "Could not add verified aggregate attestation to the inclusion pool"; + "error" => ?e, + "request_index" => index, + ); failures.push(api_types::Failure::new(index, format!("Op pool: {:?}", e))); } } diff --git a/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs b/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs index e6625e43f8..93ed1b463b 100644 --- a/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs @@ -54,6 +54,12 @@ impl VerifiedAttestation for VerifiedUnaggregate { fn indexed_attestation(&self) -> &IndexedAttestation { &self.indexed_attestation } + + fn into_attestation_and_indices(self) -> (Attestation, Vec) { + let attestation = *self.attestation; + let attesting_indices = self.indexed_attestation.attesting_indices.into(); + (attestation, attesting_indices) + } } /// An attestation that failed validation by the `BeaconChain`. @@ -81,6 +87,13 @@ impl VerifiedAttestation for VerifiedAggregate { fn indexed_attestation(&self) -> &IndexedAttestation { &self.indexed_attestation } + + /// Efficient clone-free implementation that moves out of the `Box`. + fn into_attestation_and_indices(self) -> (Attestation, Vec) { + let attestation = self.signed_aggregate.message.aggregate; + let attesting_indices = self.indexed_attestation.attesting_indices.into(); + (attestation, attesting_indices) + } } /// An attestation that failed validation by the `BeaconChain`. @@ -595,7 +608,7 @@ impl Worker { } } - if let Err(e) = self.chain.add_to_block_inclusion_pool(&verified_aggregate) { + if let Err(e) = self.chain.add_to_block_inclusion_pool(verified_aggregate) { debug!( self.log, "Attestation invalid for op pool"; diff --git a/beacon_node/operation_pool/Cargo.toml b/beacon_node/operation_pool/Cargo.toml index 6b8b8eb145..1d67ecdccc 100644 --- a/beacon_node/operation_pool/Cargo.toml +++ b/beacon_node/operation_pool/Cargo.toml @@ -18,7 +18,9 @@ rayon = "1.5.0" serde = "1.0.116" serde_derive = "1.0.116" store = { path = "../store" } +bitvec = "1" [dev-dependencies] beacon_chain = { path = "../beacon_chain" } tokio = { version = "1.14.0", features = ["rt-multi-thread"] } +maplit = "1.0.2" diff --git a/beacon_node/operation_pool/src/attestation.rs b/beacon_node/operation_pool/src/attestation.rs index 2f7fba4540..4af4edc0e4 100644 --- a/beacon_node/operation_pool/src/attestation.rs +++ b/beacon_node/operation_pool/src/attestation.rs @@ -1,4 +1,6 @@ +use crate::attestation_storage::AttestationRef; use crate::max_cover::MaxCover; +use crate::reward_cache::RewardCache; use state_processing::common::{ altair, base, get_attestation_participation_flag_indices, get_attesting_indices, }; @@ -12,34 +14,35 @@ use types::{ #[derive(Debug, Clone)] pub struct AttMaxCover<'a, T: EthSpec> { /// Underlying attestation. - pub att: &'a Attestation, + pub att: AttestationRef<'a, T>, /// Mapping of validator indices and their rewards. pub fresh_validators_rewards: HashMap, } impl<'a, T: EthSpec> AttMaxCover<'a, T> { pub fn new( - att: &'a Attestation, + att: AttestationRef<'a, T>, state: &BeaconState, + reward_cache: &'a RewardCache, total_active_balance: u64, spec: &ChainSpec, ) -> Option { if let BeaconState::Base(ref base_state) = state { Self::new_for_base(att, state, base_state, total_active_balance, spec) } else { - Self::new_for_altair(att, state, total_active_balance, spec) + Self::new_for_altair(att, state, reward_cache, total_active_balance, spec) } } /// Initialise an attestation cover object for base/phase0 hard fork. pub fn new_for_base( - att: &'a Attestation, + att: AttestationRef<'a, T>, state: &BeaconState, base_state: &BeaconStateBase, total_active_balance: u64, spec: &ChainSpec, ) -> Option { - let fresh_validators = earliest_attestation_validators(att, state, base_state); + let fresh_validators = earliest_attestation_validators(&att, state, base_state); let committee = state .get_beacon_committee(att.data.slot, att.data.index) .ok()?; @@ -67,45 +70,41 @@ impl<'a, T: EthSpec> AttMaxCover<'a, T> { /// Initialise an attestation cover object for Altair or later. pub fn new_for_altair( - att: &'a Attestation, + att: AttestationRef<'a, T>, state: &BeaconState, + reward_cache: &'a RewardCache, total_active_balance: u64, spec: &ChainSpec, ) -> Option { - let committee = state - .get_beacon_committee(att.data.slot, att.data.index) - .ok()?; - let attesting_indices = - get_attesting_indices::(committee.committee, &att.aggregation_bits).ok()?; + let att_data = att.attestation_data(); - let participation_list = if att.data.target.epoch == state.current_epoch() { - state.current_epoch_participation().ok()? - } else if att.data.target.epoch == state.previous_epoch() { - state.previous_epoch_participation().ok()? - } else { - return None; - }; - - let inclusion_delay = state.slot().as_u64().checked_sub(att.data.slot.as_u64())?; + let inclusion_delay = state.slot().as_u64().checked_sub(att_data.slot.as_u64())?; let att_participation_flags = - get_attestation_participation_flag_indices(state, &att.data, inclusion_delay, spec) + get_attestation_participation_flag_indices(state, &att_data, inclusion_delay, spec) .ok()?; let base_reward_per_increment = altair::BaseRewardPerIncrement::new(total_active_balance, spec).ok()?; - let fresh_validators_rewards = attesting_indices + let fresh_validators_rewards = att + .indexed + .attesting_indices .iter() .filter_map(|&index| { + if reward_cache + .has_attested_in_epoch(index, att_data.target.epoch) + .ok()? + { + return None; + } + let mut proposer_reward_numerator = 0; - let participation = participation_list.get(index)?; let base_reward = - altair::get_base_reward(state, index, base_reward_per_increment, spec).ok()?; + altair::get_base_reward(state, index as usize, base_reward_per_increment, spec) + .ok()?; for (flag_index, weight) in PARTICIPATION_FLAG_WEIGHTS.iter().enumerate() { - if att_participation_flags.contains(&flag_index) - && !participation.has_flag(flag_index).ok()? - { + if att_participation_flags.contains(&flag_index) { proposer_reward_numerator += base_reward.checked_mul(*weight)?; } } @@ -113,7 +112,7 @@ impl<'a, T: EthSpec> AttMaxCover<'a, T> { let proposer_reward = proposer_reward_numerator .checked_div(WEIGHT_DENOMINATOR.checked_mul(spec.proposer_reward_quotient)?)?; - Some((index as u64, proposer_reward)).filter(|_| proposer_reward != 0) + Some((index, proposer_reward)).filter(|_| proposer_reward != 0) }) .collect(); @@ -126,10 +125,15 @@ impl<'a, T: EthSpec> AttMaxCover<'a, T> { impl<'a, T: EthSpec> MaxCover for AttMaxCover<'a, T> { type Object = Attestation; + type Intermediate = AttestationRef<'a, T>; type Set = HashMap; - fn object(&self) -> &Attestation { - self.att + fn intermediate(&self) -> &AttestationRef<'a, T> { + &self.att + } + + fn convert_to_object(att_ref: &AttestationRef<'a, T>) -> Attestation { + att_ref.clone_as_attestation() } fn covering_set(&self) -> &HashMap { @@ -148,7 +152,7 @@ impl<'a, T: EthSpec> MaxCover for AttMaxCover<'a, T> { /// of slashable voting, which is rare. fn update_covering_set( &mut self, - best_att: &Attestation, + best_att: &AttestationRef<'a, T>, covered_validators: &HashMap, ) { if self.att.data.slot == best_att.data.slot && self.att.data.index == best_att.data.index { @@ -172,16 +176,16 @@ impl<'a, T: EthSpec> MaxCover for AttMaxCover<'a, T> { /// /// This isn't optimal, but with the Altair fork this code is obsolete and not worth upgrading. pub fn earliest_attestation_validators( - attestation: &Attestation, + attestation: &AttestationRef, state: &BeaconState, base_state: &BeaconStateBase, ) -> BitList { // Bitfield of validators whose attestations are new/fresh. - let mut new_validators = attestation.aggregation_bits.clone(); + let mut new_validators = attestation.indexed.aggregation_bits.clone(); - let state_attestations = if attestation.data.target.epoch == state.current_epoch() { + let state_attestations = if attestation.checkpoint.target_epoch == state.current_epoch() { &base_state.current_epoch_attestations - } else if attestation.data.target.epoch == state.previous_epoch() { + } else if attestation.checkpoint.target_epoch == state.previous_epoch() { &base_state.previous_epoch_attestations } else { return BitList::with_capacity(0).unwrap(); diff --git a/beacon_node/operation_pool/src/attestation_id.rs b/beacon_node/operation_pool/src/attestation_id.rs index f496ecb3a3..b65975787e 100644 --- a/beacon_node/operation_pool/src/attestation_id.rs +++ b/beacon_node/operation_pool/src/attestation_id.rs @@ -1,45 +1,12 @@ use serde_derive::{Deserialize, Serialize}; -use ssz::ssz_encode; use ssz_derive::{Decode, Encode}; -use types::{AttestationData, ChainSpec, Domain, Epoch, Fork, Hash256}; /// Serialized `AttestationData` augmented with a domain to encode the fork info. +/// +/// [DEPRECATED] To be removed once all nodes have updated to schema v12. #[derive( PartialEq, Eq, Clone, Hash, Debug, PartialOrd, Ord, Encode, Decode, Serialize, Deserialize, )] pub struct AttestationId { v: Vec, } - -/// Number of domain bytes that the end of an attestation ID is padded with. -const DOMAIN_BYTES_LEN: usize = std::mem::size_of::(); - -impl AttestationId { - pub fn from_data( - attestation: &AttestationData, - fork: &Fork, - genesis_validators_root: Hash256, - spec: &ChainSpec, - ) -> Self { - let mut bytes = ssz_encode(attestation); - let epoch = attestation.target.epoch; - bytes.extend_from_slice( - AttestationId::compute_domain_bytes(epoch, fork, genesis_validators_root, spec) - .as_bytes(), - ); - AttestationId { v: bytes } - } - - pub fn compute_domain_bytes( - epoch: Epoch, - fork: &Fork, - genesis_validators_root: Hash256, - spec: &ChainSpec, - ) -> Hash256 { - spec.get_domain(epoch, Domain::BeaconAttester, fork, genesis_validators_root) - } - - pub fn domain_bytes_match(&self, domain_bytes: &Hash256) -> bool { - &self.v[self.v.len() - DOMAIN_BYTES_LEN..] == domain_bytes.as_bytes() - } -} diff --git a/beacon_node/operation_pool/src/attestation_storage.rs b/beacon_node/operation_pool/src/attestation_storage.rs new file mode 100644 index 0000000000..0fb9bafd82 --- /dev/null +++ b/beacon_node/operation_pool/src/attestation_storage.rs @@ -0,0 +1,245 @@ +use crate::AttestationStats; +use itertools::Itertools; +use std::collections::HashMap; +use types::{ + AggregateSignature, Attestation, AttestationData, BeaconState, BitList, Checkpoint, Epoch, + EthSpec, Hash256, Slot, +}; + +#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy)] +pub struct CheckpointKey { + pub source: Checkpoint, + pub target_epoch: Epoch, +} + +#[derive(Debug, PartialEq, Eq, Hash)] +pub struct CompactAttestationData { + pub slot: Slot, + pub index: u64, + pub beacon_block_root: Hash256, + pub target_root: Hash256, +} + +#[derive(Debug, PartialEq)] +pub struct CompactIndexedAttestation { + pub attesting_indices: Vec, + pub aggregation_bits: BitList, + pub signature: AggregateSignature, +} + +#[derive(Debug)] +pub struct SplitAttestation { + pub checkpoint: CheckpointKey, + pub data: CompactAttestationData, + pub indexed: CompactIndexedAttestation, +} + +#[derive(Debug, Clone)] +pub struct AttestationRef<'a, T: EthSpec> { + pub checkpoint: &'a CheckpointKey, + pub data: &'a CompactAttestationData, + pub indexed: &'a CompactIndexedAttestation, +} + +#[derive(Debug, Default, PartialEq)] +pub struct AttestationMap { + checkpoint_map: HashMap>, +} + +#[derive(Debug, Default, PartialEq)] +pub struct AttestationDataMap { + attestations: HashMap>>, +} + +impl SplitAttestation { + pub fn new(attestation: Attestation, attesting_indices: Vec) -> Self { + let checkpoint = CheckpointKey { + source: attestation.data.source, + target_epoch: attestation.data.target.epoch, + }; + let data = CompactAttestationData { + slot: attestation.data.slot, + index: attestation.data.index, + beacon_block_root: attestation.data.beacon_block_root, + target_root: attestation.data.target.root, + }; + let indexed = CompactIndexedAttestation { + attesting_indices, + aggregation_bits: attestation.aggregation_bits, + signature: attestation.signature, + }; + Self { + checkpoint, + data, + indexed, + } + } + + pub fn as_ref(&self) -> AttestationRef { + AttestationRef { + checkpoint: &self.checkpoint, + data: &self.data, + indexed: &self.indexed, + } + } +} + +impl<'a, T: EthSpec> AttestationRef<'a, T> { + pub fn attestation_data(&self) -> AttestationData { + AttestationData { + slot: self.data.slot, + index: self.data.index, + beacon_block_root: self.data.beacon_block_root, + source: self.checkpoint.source, + target: Checkpoint { + epoch: self.checkpoint.target_epoch, + root: self.data.target_root, + }, + } + } + + pub fn clone_as_attestation(&self) -> Attestation { + Attestation { + aggregation_bits: self.indexed.aggregation_bits.clone(), + data: self.attestation_data(), + signature: self.indexed.signature.clone(), + } + } +} + +impl CheckpointKey { + /// Return two checkpoint keys: `(previous, current)` for the previous and current epochs of + /// the `state`. + pub fn keys_for_state(state: &BeaconState) -> (Self, Self) { + ( + CheckpointKey { + source: state.previous_justified_checkpoint(), + target_epoch: state.previous_epoch(), + }, + CheckpointKey { + source: state.current_justified_checkpoint(), + target_epoch: state.current_epoch(), + }, + ) + } +} + +impl CompactIndexedAttestation { + pub fn signers_disjoint_from(&self, other: &Self) -> bool { + self.aggregation_bits + .intersection(&other.aggregation_bits) + .is_zero() + } + + pub fn aggregate(&mut self, other: &Self) { + self.attesting_indices = self + .attesting_indices + .drain(..) + .merge(other.attesting_indices.iter().copied()) + .dedup() + .collect(); + self.aggregation_bits = self.aggregation_bits.union(&other.aggregation_bits); + self.signature.add_assign_aggregate(&other.signature); + } +} + +impl AttestationMap { + pub fn insert(&mut self, attestation: Attestation, attesting_indices: Vec) { + let SplitAttestation { + checkpoint, + data, + indexed, + } = SplitAttestation::new(attestation, attesting_indices); + + let attestation_map = self + .checkpoint_map + .entry(checkpoint) + .or_insert_with(AttestationDataMap::default); + let attestations = attestation_map + .attestations + .entry(data) + .or_insert_with(Vec::new); + + // Greedily aggregate the attestation with all existing attestations. + // NOTE: this is sub-optimal and in future we will remove this in favour of max-clique + // aggregation. + let mut aggregated = false; + for existing_attestation in attestations.iter_mut() { + if existing_attestation.signers_disjoint_from(&indexed) { + existing_attestation.aggregate(&indexed); + aggregated = true; + } else if *existing_attestation == indexed { + aggregated = true; + } + } + + if !aggregated { + attestations.push(indexed); + } + } + + /// Iterate all attestations matching the given `checkpoint_key`. + pub fn get_attestations<'a>( + &'a self, + checkpoint_key: &'a CheckpointKey, + ) -> impl Iterator> + 'a { + self.checkpoint_map + .get(checkpoint_key) + .into_iter() + .flat_map(|attestation_map| attestation_map.iter(checkpoint_key)) + } + + /// Iterate all attestations in the map. + pub fn iter(&self) -> impl Iterator> { + self.checkpoint_map + .iter() + .flat_map(|(checkpoint_key, attestation_map)| attestation_map.iter(checkpoint_key)) + } + + /// Prune attestations that are from before the previous epoch. + pub fn prune(&mut self, current_epoch: Epoch) { + self.checkpoint_map + .retain(|checkpoint_key, _| current_epoch <= checkpoint_key.target_epoch + 1); + } + + /// Statistics about all attestations stored in the map. + pub fn stats(&self) -> AttestationStats { + self.checkpoint_map + .values() + .map(AttestationDataMap::stats) + .fold(AttestationStats::default(), |mut acc, new| { + acc.num_attestations += new.num_attestations; + acc.num_attestation_data += new.num_attestation_data; + acc.max_aggregates_per_data = + std::cmp::max(acc.max_aggregates_per_data, new.max_aggregates_per_data); + acc + }) + } +} + +impl AttestationDataMap { + pub fn iter<'a>( + &'a self, + checkpoint_key: &'a CheckpointKey, + ) -> impl Iterator> + 'a { + self.attestations.iter().flat_map(|(data, vec_indexed)| { + vec_indexed.iter().map(|indexed| AttestationRef { + checkpoint: checkpoint_key, + data, + indexed, + }) + }) + } + + pub fn stats(&self) -> AttestationStats { + let mut stats = AttestationStats::default(); + + for aggregates in self.attestations.values() { + stats.num_attestations += aggregates.len(); + stats.num_attestation_data += 1; + stats.max_aggregates_per_data = + std::cmp::max(stats.max_aggregates_per_data, aggregates.len()); + } + stats + } +} diff --git a/beacon_node/operation_pool/src/attester_slashing.rs b/beacon_node/operation_pool/src/attester_slashing.rs index 2cb63ad252..f5916384d4 100644 --- a/beacon_node/operation_pool/src/attester_slashing.rs +++ b/beacon_node/operation_pool/src/attester_slashing.rs @@ -39,14 +39,18 @@ impl<'a, T: EthSpec> AttesterSlashingMaxCover<'a, T> { impl<'a, T: EthSpec> MaxCover for AttesterSlashingMaxCover<'a, T> { /// The result type, of which we would eventually like a collection of maximal quality. type Object = AttesterSlashing; + type Intermediate = AttesterSlashing; /// The type used to represent sets. type Set = HashMap; - /// Extract an object for inclusion in a solution. - fn object(&self) -> &AttesterSlashing { + fn intermediate(&self) -> &AttesterSlashing { self.slashing } + fn convert_to_object(slashing: &AttesterSlashing) -> AttesterSlashing { + slashing.clone() + } + /// Get the set of elements covered. fn covering_set(&self) -> &HashMap { &self.effective_balances diff --git a/beacon_node/operation_pool/src/lib.rs b/beacon_node/operation_pool/src/lib.rs index 771dca12f6..8c335189c6 100644 --- a/beacon_node/operation_pool/src/lib.rs +++ b/beacon_node/operation_pool/src/lib.rs @@ -1,34 +1,38 @@ mod attestation; mod attestation_id; +mod attestation_storage; mod attester_slashing; mod max_cover; mod metrics; mod persistence; +mod reward_cache; mod sync_aggregate_id; pub use attestation::AttMaxCover; +pub use attestation_storage::{AttestationRef, SplitAttestation}; pub use max_cover::MaxCover; -pub use persistence::{PersistedOperationPool, PersistedOperationPoolAltair}; +pub use persistence::{ + PersistedOperationPool, PersistedOperationPoolV12, PersistedOperationPoolV5, +}; +pub use reward_cache::RewardCache; +use crate::attestation_storage::{AttestationMap, CheckpointKey}; use crate::sync_aggregate_id::SyncAggregateId; -use attestation_id::AttestationId; use attester_slashing::AttesterSlashingMaxCover; use max_cover::maximum_cover; -use parking_lot::RwLock; +use parking_lot::{RwLock, RwLockWriteGuard}; use state_processing::per_block_processing::errors::AttestationValidationError; use state_processing::per_block_processing::{ - get_slashable_indices_modular, verify_attestation_for_block_inclusion, verify_exit, - VerifySignatures, + get_slashable_indices_modular, verify_exit, VerifySignatures, }; -use state_processing::SigVerifiedOp; +use state_processing::{SigVerifiedOp, VerifyOperation}; use std::collections::{hash_map::Entry, HashMap, HashSet}; use std::marker::PhantomData; use std::ptr; use types::{ - sync_aggregate::Error as SyncAggregateError, typenum::Unsigned, Attestation, AttesterSlashing, - BeaconState, BeaconStateError, ChainSpec, Epoch, EthSpec, Fork, ForkVersion, Hash256, - ProposerSlashing, SignedVoluntaryExit, Slot, SyncAggregate, SyncCommitteeContribution, - Validator, + sync_aggregate::Error as SyncAggregateError, typenum::Unsigned, Attestation, AttestationData, + AttesterSlashing, BeaconState, BeaconStateError, ChainSpec, Epoch, EthSpec, ProposerSlashing, + SignedVoluntaryExit, Slot, SyncAggregate, SyncCommitteeContribution, Validator, }; type SyncContributions = RwLock>>>; @@ -36,15 +40,17 @@ type SyncContributions = RwLock { /// Map from attestation ID (see below) to vectors of attestations. - attestations: RwLock>>>, + attestations: RwLock>, /// Map from sync aggregate ID to the best `SyncCommitteeContribution`s seen for that ID. sync_contributions: SyncContributions, /// Set of attester slashings, and the fork version they were verified against. - attester_slashings: RwLock, ForkVersion)>>, + attester_slashings: RwLock, T>>>, /// Map from proposer index to slashing. - proposer_slashings: RwLock>, + proposer_slashings: RwLock>>, /// Map from exiting validator to their exit data. - voluntary_exits: RwLock>, + voluntary_exits: RwLock>>, + /// Reward cache for accelerating attestation packing. + reward_cache: RwLock, _phantom: PhantomData, } @@ -53,9 +59,16 @@ pub enum OpPoolError { GetAttestationsTotalBalanceError(BeaconStateError), GetBlockRootError(BeaconStateError), SyncAggregateError(SyncAggregateError), + RewardCacheUpdatePrevEpoch(BeaconStateError), + RewardCacheUpdateCurrEpoch(BeaconStateError), + RewardCacheGetBlockRoot(BeaconStateError), + RewardCacheWrongEpoch, + RewardCacheValidatorUnknown(BeaconStateError), + RewardCacheOutOfBounds, IncorrectOpPoolVariant, } +#[derive(Default)] pub struct AttestationStats { /// Total number of attestations for all committeees/indices/votes. pub num_attestations: usize, @@ -176,95 +189,45 @@ impl OperationPool { pub fn insert_attestation( &self, attestation: Attestation, - fork: &Fork, - genesis_validators_root: Hash256, - spec: &ChainSpec, + attesting_indices: Vec, ) -> Result<(), AttestationValidationError> { - let id = AttestationId::from_data(&attestation.data, fork, genesis_validators_root, spec); - - // Take a write lock on the attestations map. - let mut attestations = self.attestations.write(); - - let existing_attestations = match attestations.entry(id) { - Entry::Vacant(entry) => { - entry.insert(vec![attestation]); - return Ok(()); - } - Entry::Occupied(entry) => entry.into_mut(), - }; - - let mut aggregated = false; - for existing_attestation in existing_attestations.iter_mut() { - if existing_attestation.signers_disjoint_from(&attestation) { - existing_attestation.aggregate(&attestation); - aggregated = true; - } else if *existing_attestation == attestation { - aggregated = true; - } - } - - if !aggregated { - existing_attestations.push(attestation); - } - + self.attestations + .write() + .insert(attestation, attesting_indices); Ok(()) } /// Total number of attestations in the pool, including attestations for the same data. pub fn num_attestations(&self) -> usize { - self.attestations.read().values().map(Vec::len).sum() + self.attestation_stats().num_attestations } pub fn attestation_stats(&self) -> AttestationStats { - let mut num_attestations = 0; - let mut num_attestation_data = 0; - let mut max_aggregates_per_data = 0; - - for aggregates in self.attestations.read().values() { - num_attestations += aggregates.len(); - num_attestation_data += 1; - max_aggregates_per_data = std::cmp::max(max_aggregates_per_data, aggregates.len()); - } - AttestationStats { - num_attestations, - num_attestation_data, - max_aggregates_per_data, - } + self.attestations.read().stats() } /// Return all valid attestations for the given epoch, for use in max cover. + #[allow(clippy::too_many_arguments)] fn get_valid_attestations_for_epoch<'a>( &'a self, - epoch: Epoch, - all_attestations: &'a HashMap>>, + checkpoint_key: &'a CheckpointKey, + all_attestations: &'a AttestationMap, state: &'a BeaconState, + reward_cache: &'a RewardCache, total_active_balance: u64, - validity_filter: impl FnMut(&&Attestation) -> bool + Send, + validity_filter: impl FnMut(&AttestationRef<'a, T>) -> bool + Send, spec: &'a ChainSpec, ) -> impl Iterator> + Send { - let domain_bytes = AttestationId::compute_domain_bytes( - epoch, - &state.fork(), - state.genesis_validators_root(), - spec, - ); all_attestations - .iter() - .filter(move |(key, _)| key.domain_bytes_match(&domain_bytes)) - .flat_map(|(_, attestations)| attestations) - .filter(move |attestation| attestation.data.target.epoch == epoch) - .filter(move |attestation| { - // Ensure attestations are valid for block inclusion - verify_attestation_for_block_inclusion( - state, - attestation, - VerifySignatures::False, - spec, - ) - .is_ok() + .get_attestations(checkpoint_key) + .filter(|att| { + att.data.slot + spec.min_attestation_inclusion_delay <= state.slot() + && state.slot() <= att.data.slot + T::slots_per_epoch() }) .filter(validity_filter) - .filter_map(move |att| AttMaxCover::new(att, state, total_active_balance, spec)) + .filter_map(move |att| { + AttMaxCover::new(att, state, reward_cache, total_active_balance, spec) + }) } /// Get a list of attestations for inclusion in a block. @@ -276,18 +239,24 @@ impl OperationPool { pub fn get_attestations( &self, state: &BeaconState, - prev_epoch_validity_filter: impl FnMut(&&Attestation) -> bool + Send, - curr_epoch_validity_filter: impl FnMut(&&Attestation) -> bool + Send, + prev_epoch_validity_filter: impl for<'a> FnMut(&AttestationRef<'a, T>) -> bool + Send, + curr_epoch_validity_filter: impl for<'a> FnMut(&AttestationRef<'a, T>) -> bool + Send, spec: &ChainSpec, ) -> Result>, OpPoolError> { // Attestations for the current fork, which may be from the current or previous epoch. - let prev_epoch = state.previous_epoch(); - let current_epoch = state.current_epoch(); + let (prev_epoch_key, curr_epoch_key) = CheckpointKey::keys_for_state(state); let all_attestations = self.attestations.read(); let total_active_balance = state .get_total_active_balance() .map_err(OpPoolError::GetAttestationsTotalBalanceError)?; + // Update the reward cache. + let reward_timer = metrics::start_timer(&metrics::BUILD_REWARD_CACHE_TIME); + let mut reward_cache = self.reward_cache.write(); + reward_cache.update(state)?; + let reward_cache = RwLockWriteGuard::downgrade(reward_cache); + drop(reward_timer); + // Split attestations for the previous & current epochs, so that we // can optimise them individually in parallel. let mut num_prev_valid = 0_i64; @@ -295,9 +264,10 @@ impl OperationPool { let prev_epoch_att = self .get_valid_attestations_for_epoch( - prev_epoch, + &prev_epoch_key, &*all_attestations, state, + &*reward_cache, total_active_balance, prev_epoch_validity_filter, spec, @@ -305,9 +275,10 @@ impl OperationPool { .inspect(|_| num_prev_valid += 1); let curr_epoch_att = self .get_valid_attestations_for_epoch( - current_epoch, + &curr_epoch_key, &*all_attestations, state, + &*reward_cache, total_active_balance, curr_epoch_validity_filter, spec, @@ -328,7 +299,7 @@ impl OperationPool { move || { let _timer = metrics::start_timer(&metrics::ATTESTATION_PREV_EPOCH_PACKING_TIME); // If we're in the genesis epoch, just use the current epoch attestations. - if prev_epoch == current_epoch { + if prev_epoch_key == curr_epoch_key { vec![] } else { maximum_cover(prev_epoch_att, prev_epoch_limit, "prev_epoch_attestations") @@ -356,36 +327,26 @@ impl OperationPool { /// Remove attestations which are too old to be included in a block. pub fn prune_attestations(&self, current_epoch: Epoch) { - // Prune attestations that are from before the previous epoch. - self.attestations.write().retain(|_, attestations| { - // All the attestations in this bucket have the same data, so we only need to - // check the first one. - attestations - .first() - .map_or(false, |att| current_epoch <= att.data.target.epoch + 1) - }); + self.attestations.write().prune(current_epoch); } /// Insert a proposer slashing into the pool. pub fn insert_proposer_slashing( &self, - verified_proposer_slashing: SigVerifiedOp, + verified_proposer_slashing: SigVerifiedOp, ) { - let slashing = verified_proposer_slashing.into_inner(); - self.proposer_slashings - .write() - .insert(slashing.signed_header_1.message.proposer_index, slashing); + self.proposer_slashings.write().insert( + verified_proposer_slashing.as_inner().proposer_index(), + verified_proposer_slashing, + ); } /// Insert an attester slashing into the pool. pub fn insert_attester_slashing( &self, - verified_slashing: SigVerifiedOp>, - fork: Fork, + verified_slashing: SigVerifiedOp, T>, ) { - self.attester_slashings - .write() - .insert((verified_slashing.into_inner(), fork.current_version)); + self.attester_slashings.write().insert(verified_slashing); } /// Get proposer and attester slashings for inclusion in a block. @@ -405,11 +366,13 @@ impl OperationPool { let proposer_slashings = filter_limit_operations( self.proposer_slashings.read().values(), |slashing| { - state - .validators() - .get(slashing.signed_header_1.message.proposer_index as usize) - .map_or(false, |validator| !validator.slashed) + slashing.signature_is_still_valid(&state.fork()) + && state + .validators() + .get(slashing.as_inner().signed_header_1.message.proposer_index as usize) + .map_or(false, |validator| !validator.slashed) }, + |slashing| slashing.as_inner().clone(), T::MaxProposerSlashings::to_usize(), ); @@ -417,30 +380,10 @@ impl OperationPool { // slashings. let mut to_be_slashed = proposer_slashings .iter() - .map(|s| s.signed_header_1.message.proposer_index) - .collect::>(); + .map(|s| s.proposer_index()) + .collect(); - let reader = self.attester_slashings.read(); - - let relevant_attester_slashings = reader.iter().flat_map(|(slashing, fork)| { - if *fork == state.fork().previous_version || *fork == state.fork().current_version { - AttesterSlashingMaxCover::new(slashing, &to_be_slashed, state) - } else { - None - } - }); - - let attester_slashings = maximum_cover( - relevant_attester_slashings, - T::MaxAttesterSlashings::to_usize(), - "attester_slashings", - ) - .into_iter() - .map(|cover| { - to_be_slashed.extend(cover.covering_set().keys()); - cover.object().clone() - }) - .collect(); + let attester_slashings = self.get_attester_slashings(state, &mut to_be_slashed); let voluntary_exits = self.get_voluntary_exits( state, @@ -451,6 +394,37 @@ impl OperationPool { (proposer_slashings, attester_slashings, voluntary_exits) } + /// Get attester slashings taking into account already slashed validators. + /// + /// This function *must* remain private. + fn get_attester_slashings( + &self, + state: &BeaconState, + to_be_slashed: &mut HashSet, + ) -> Vec> { + let reader = self.attester_slashings.read(); + + let relevant_attester_slashings = reader.iter().flat_map(|slashing| { + if slashing.signature_is_still_valid(&state.fork()) { + AttesterSlashingMaxCover::new(slashing.as_inner(), to_be_slashed, state) + } else { + None + } + }); + + maximum_cover( + relevant_attester_slashings, + T::MaxAttesterSlashings::to_usize(), + "attester_slashings", + ) + .into_iter() + .map(|cover| { + to_be_slashed.extend(cover.covering_set().keys()); + cover.intermediate().clone() + }) + .collect() + } + /// Prune proposer slashings for validators which are exited in the finalized epoch. pub fn prune_proposer_slashings(&self, head_state: &BeaconState) { prune_validator_hash_map( @@ -463,30 +437,23 @@ impl OperationPool { /// Prune attester slashings for all slashed or withdrawn validators, or attestations on another /// fork. pub fn prune_attester_slashings(&self, head_state: &BeaconState) { - self.attester_slashings - .write() - .retain(|(slashing, fork_version)| { - let previous_fork_is_finalized = - head_state.finalized_checkpoint().epoch >= head_state.fork().epoch; - // Prune any slashings which don't match the current fork version, or the previous - // fork version if it is not finalized yet. - let fork_ok = (*fork_version == head_state.fork().current_version) - || (*fork_version == head_state.fork().previous_version - && !previous_fork_is_finalized); - // Slashings that don't slash any validators can also be dropped. - let slashing_ok = - get_slashable_indices_modular(head_state, slashing, |_, validator| { - // Declare that a validator is still slashable if they have not exited prior - // to the finalized epoch. - // - // We cannot check the `slashed` field since the `head` is not finalized and - // a fork could un-slash someone. - validator.exit_epoch > head_state.finalized_checkpoint().epoch - }) - .map_or(false, |indices| !indices.is_empty()); + self.attester_slashings.write().retain(|slashing| { + // Check that the attestation's signature is still valid wrt the fork version. + let signature_ok = slashing.signature_is_still_valid(&head_state.fork()); + // Slashings that don't slash any validators can also be dropped. + let slashing_ok = + get_slashable_indices_modular(head_state, slashing.as_inner(), |_, validator| { + // Declare that a validator is still slashable if they have not exited prior + // to the finalized epoch. + // + // We cannot check the `slashed` field since the `head` is not finalized and + // a fork could un-slash someone. + validator.exit_epoch > head_state.finalized_checkpoint().epoch + }) + .map_or(false, |indices| !indices.is_empty()); - fork_ok && slashing_ok - }); + signature_ok && slashing_ok + }); } /// Total number of attester slashings in the pool. @@ -500,11 +467,10 @@ impl OperationPool { } /// Insert a voluntary exit that has previously been checked elsewhere. - pub fn insert_voluntary_exit(&self, verified_exit: SigVerifiedOp) { - let exit = verified_exit.into_inner(); + pub fn insert_voluntary_exit(&self, exit: SigVerifiedOp) { self.voluntary_exits .write() - .insert(exit.message.validator_index, exit); + .insert(exit.as_inner().message.validator_index, exit); } /// Get a list of voluntary exits for inclusion in a block. @@ -519,7 +485,12 @@ impl OperationPool { { filter_limit_operations( self.voluntary_exits.read().values(), - |exit| filter(exit) && verify_exit(state, exit, VerifySignatures::False, spec).is_ok(), + |exit| { + filter(exit.as_inner()) + && exit.signature_is_still_valid(&state.fork()) + && verify_exit(state, exit.as_inner(), VerifySignatures::False, spec).is_ok() + }, + |exit| exit.as_inner().clone(), T::MaxVoluntaryExits::to_usize(), ) } @@ -558,8 +529,8 @@ impl OperationPool { pub fn get_all_attestations(&self) -> Vec> { self.attestations .read() - .values() - .flat_map(|attns| attns.iter().cloned()) + .iter() + .map(|att| att.clone_as_attestation()) .collect() } @@ -568,14 +539,13 @@ impl OperationPool { /// This method may return objects that are invalid for block inclusion. pub fn get_filtered_attestations(&self, filter: F) -> Vec> where - F: Fn(&Attestation) -> bool, + F: Fn(&AttestationData) -> bool, { self.attestations .read() - .values() - .flat_map(|attns| attns.iter()) - .filter(|attn| filter(*attn)) - .cloned() + .iter() + .filter(|att| filter(&att.attestation_data())) + .map(|att| att.clone_as_attestation()) .collect() } @@ -586,7 +556,7 @@ impl OperationPool { self.attester_slashings .read() .iter() - .map(|(slashing, _)| slashing.clone()) + .map(|slashing| slashing.as_inner().clone()) .collect() } @@ -597,7 +567,7 @@ impl OperationPool { self.proposer_slashings .read() .iter() - .map(|(_, slashing)| slashing.clone()) + .map(|(_, slashing)| slashing.as_inner().clone()) .collect() } @@ -608,23 +578,29 @@ impl OperationPool { self.voluntary_exits .read() .iter() - .map(|(_, exit)| exit.clone()) + .map(|(_, exit)| exit.as_inner().clone()) .collect() } } /// Filter up to a maximum number of operations out of an iterator. -fn filter_limit_operations<'a, T: 'a, I, F>(operations: I, filter: F, limit: usize) -> Vec +fn filter_limit_operations<'a, T: 'a, V: 'a, I, F, G>( + operations: I, + filter: F, + mapping: G, + limit: usize, +) -> Vec where I: IntoIterator, F: Fn(&T) -> bool, + G: Fn(&T) -> V, T: Clone, { operations .into_iter() .filter(|x| filter(*x)) .take(limit) - .cloned() + .map(mapping) .collect() } @@ -634,17 +610,19 @@ where /// in the state's validator registry and then passed to `prune_if`. /// Entries for unknown validators will be kept. fn prune_validator_hash_map( - map: &mut HashMap, + map: &mut HashMap>, prune_if: F, head_state: &BeaconState, ) where F: Fn(&Validator) -> bool, + T: VerifyOperation, { - map.retain(|&validator_index, _| { - head_state - .validators() - .get(validator_index as usize) - .map_or(true, |validator| !prune_if(validator)) + map.retain(|&validator_index, op| { + op.signature_is_still_valid(&head_state.fork()) + && head_state + .validators() + .get(validator_index as usize) + .map_or(true, |validator| !prune_if(validator)) }); } @@ -655,6 +633,7 @@ impl PartialEq for OperationPool { return true; } *self.attestations.read() == *other.attestations.read() + && *self.sync_contributions.read() == *other.sync_contributions.read() && *self.attester_slashings.read() == *other.attester_slashings.read() && *self.proposer_slashings.read() == *other.proposer_slashings.read() && *self.voluntary_exits.read() == *other.voluntary_exits.read() @@ -669,7 +648,8 @@ mod release_tests { test_spec, BeaconChainHarness, EphemeralHarnessType, RelativeSyncCommittee, }; use lazy_static::lazy_static; - use state_processing::VerifyOperation; + use maplit::hashset; + use state_processing::{common::get_attesting_indices_from_state, VerifyOperation}; use std::collections::BTreeSet; use types::consts::altair::SYNC_COMMITTEE_SUBNET_COUNT; use types::*; @@ -689,6 +669,7 @@ mod release_tests { .spec_or_default(spec) .keypairs(KEYPAIRS[0..validator_count].to_vec()) .fresh_ephemeral_store() + .mock_execution_layer() .build(); harness.advance_slot(); @@ -714,7 +695,6 @@ mod release_tests { num_committees: usize, ) -> (BeaconChainHarness>, ChainSpec) { let mut spec = E::default_spec(); - spec.altair_fork_epoch = Some(Epoch::new(0)); let num_validators = @@ -780,10 +760,19 @@ mod release_tests { }) .unwrap(); + let att1_indices = get_attesting_indices_from_state(&state, &att1).unwrap(); + let att2_indices = get_attesting_indices_from_state(&state, &att2).unwrap(); + let att1_split = SplitAttestation::new(att1.clone(), att1_indices); + let att2_split = SplitAttestation::new(att2.clone(), att2_indices); + assert_eq!( att1.aggregation_bits.num_set_bits(), - earliest_attestation_validators(&att1, &state, state.as_base().unwrap()) - .num_set_bits() + earliest_attestation_validators( + &att1_split.as_ref(), + &state, + state.as_base().unwrap() + ) + .num_set_bits() ); state @@ -800,8 +789,12 @@ mod release_tests { assert_eq!( committees.get(0).unwrap().committee.len() - 2, - earliest_attestation_validators(&att2, &state, state.as_base().unwrap()) - .num_set_bits() + earliest_attestation_validators( + &att2_split.as_ref(), + &state, + state.as_base().unwrap() + ) + .num_set_bits() ); } } @@ -840,14 +833,12 @@ mod release_tests { ); for (atts, _) in attestations { - for att in atts.into_iter() { - op_pool - .insert_attestation(att.0, &state.fork(), state.genesis_validators_root(), spec) - .unwrap(); + for (att, _) in atts { + let attesting_indices = get_attesting_indices_from_state(&state, &att).unwrap(); + op_pool.insert_attestation(att, attesting_indices).unwrap(); } } - assert_eq!(op_pool.attestations.read().len(), committees.len()); assert_eq!(op_pool.num_attestations(), committees.len()); // Before the min attestation inclusion delay, get_attestations shouldn't return anything. @@ -913,17 +904,11 @@ mod release_tests { for (_, aggregate) in attestations { let att = aggregate.unwrap().message.aggregate; + let attesting_indices = get_attesting_indices_from_state(&state, &att).unwrap(); op_pool - .insert_attestation( - att.clone(), - &state.fork(), - state.genesis_validators_root(), - spec, - ) - .unwrap(); - op_pool - .insert_attestation(att, &state.fork(), state.genesis_validators_root(), spec) + .insert_attestation(att.clone(), attesting_indices.clone()) .unwrap(); + op_pool.insert_attestation(att, attesting_indices).unwrap(); } assert_eq!(op_pool.num_attestations(), committees.len()); @@ -1007,16 +992,17 @@ mod release_tests { .collect::>(); for att in aggs1.into_iter().chain(aggs2.into_iter()) { - op_pool - .insert_attestation(att, &state.fork(), state.genesis_validators_root(), spec) - .unwrap(); + let attesting_indices = get_attesting_indices_from_state(&state, &att).unwrap(); + op_pool.insert_attestation(att, attesting_indices).unwrap(); } } // The attestations should get aggregated into two attestations that comprise all // validators. - assert_eq!(op_pool.attestations.read().len(), committees.len()); - assert_eq!(op_pool.num_attestations(), 2 * committees.len()); + let stats = op_pool.attestation_stats(); + assert_eq!(stats.num_attestation_data, committees.len()); + assert_eq!(stats.num_attestations, 2 * committees.len()); + assert_eq!(stats.max_aggregates_per_data, 2); } /// Create a bunch of attestations signed by a small number of validators, and another @@ -1078,9 +1064,8 @@ mod release_tests { .collect::>(); for att in aggs { - op_pool - .insert_attestation(att, &state.fork(), state.genesis_validators_root(), spec) - .unwrap(); + let attesting_indices = get_attesting_indices_from_state(&state, &att).unwrap(); + op_pool.insert_attestation(att, attesting_indices).unwrap(); } }; @@ -1095,12 +1080,13 @@ mod release_tests { let num_small = target_committee_size / small_step_size; let num_big = target_committee_size / big_step_size; - assert_eq!(op_pool.attestations.read().len(), committees.len()); + let stats = op_pool.attestation_stats(); + assert_eq!(stats.num_attestation_data, committees.len()); assert_eq!( - op_pool.num_attestations(), + stats.num_attestations, (num_small + num_big) * committees.len() ); - assert!(op_pool.num_attestations() > max_attestations); + assert!(stats.num_attestations > max_attestations); *state.slot_mut() += spec.min_attestation_inclusion_delay; let best_attestations = op_pool @@ -1173,9 +1159,8 @@ mod release_tests { .collect::>(); for att in aggs { - op_pool - .insert_attestation(att, &state.fork(), state.genesis_validators_root(), spec) - .unwrap(); + let attesting_indices = get_attesting_indices_from_state(&state, &att).unwrap(); + op_pool.insert_attestation(att, attesting_indices).unwrap(); } }; @@ -1190,7 +1175,10 @@ mod release_tests { let num_small = target_committee_size / small_step_size; let num_big = target_committee_size / big_step_size; - assert_eq!(op_pool.attestations.read().len(), committees.len()); + assert_eq!( + op_pool.attestation_stats().num_attestation_data, + committees.len() + ); assert_eq!( op_pool.num_attestations(), (num_small + num_big) * committees.len() @@ -1210,11 +1198,21 @@ mod release_tests { // Used for asserting that rewards are in decreasing order. let mut prev_reward = u64::max_value(); - for att in &best_attestations { - let mut fresh_validators_rewards = - AttMaxCover::new(att, &state, total_active_balance, spec) - .unwrap() - .fresh_validators_rewards; + let mut reward_cache = RewardCache::default(); + reward_cache.update(&state).unwrap(); + + for att in best_attestations { + let attesting_indices = get_attesting_indices_from_state(&state, &att).unwrap(); + let split_attestation = SplitAttestation::new(att, attesting_indices); + let mut fresh_validators_rewards = AttMaxCover::new( + split_attestation.as_ref(), + &state, + &reward_cache, + total_active_balance, + spec, + ) + .unwrap() + .fresh_validators_rewards; // Remove validators covered by previous attestations. fresh_validators_rewards @@ -1281,10 +1279,7 @@ mod release_tests { let op_pool = OperationPool::::new(); let slashing = harness.make_attester_slashing(vec![1, 3, 5, 7, 9]); - op_pool.insert_attester_slashing( - slashing.clone().validate(&state, spec).unwrap(), - state.fork(), - ); + op_pool.insert_attester_slashing(slashing.clone().validate(&state, spec).unwrap()); op_pool.prune_attester_slashings(&state); assert_eq!( op_pool.get_slashings_and_exits(&state, &harness.spec).1, @@ -1305,22 +1300,10 @@ mod release_tests { let slashing_3 = harness.make_attester_slashing(vec![4, 5, 6]); let slashing_4 = harness.make_attester_slashing(vec![7, 8, 9, 10]); - op_pool.insert_attester_slashing( - slashing_1.clone().validate(&state, spec).unwrap(), - state.fork(), - ); - op_pool.insert_attester_slashing( - slashing_2.clone().validate(&state, spec).unwrap(), - state.fork(), - ); - op_pool.insert_attester_slashing( - slashing_3.clone().validate(&state, spec).unwrap(), - state.fork(), - ); - op_pool.insert_attester_slashing( - slashing_4.clone().validate(&state, spec).unwrap(), - state.fork(), - ); + op_pool.insert_attester_slashing(slashing_1.clone().validate(&state, spec).unwrap()); + op_pool.insert_attester_slashing(slashing_2.clone().validate(&state, spec).unwrap()); + op_pool.insert_attester_slashing(slashing_3.clone().validate(&state, spec).unwrap()); + op_pool.insert_attester_slashing(slashing_4.clone().validate(&state, spec).unwrap()); let best_slashings = op_pool.get_slashings_and_exits(&state, &harness.spec); assert_eq!(best_slashings.1, vec![slashing_4, slashing_3]); @@ -1339,22 +1322,10 @@ mod release_tests { let slashing_3 = harness.make_attester_slashing(vec![5, 6]); let slashing_4 = harness.make_attester_slashing(vec![6]); - op_pool.insert_attester_slashing( - slashing_1.clone().validate(&state, spec).unwrap(), - state.fork(), - ); - op_pool.insert_attester_slashing( - slashing_2.clone().validate(&state, spec).unwrap(), - state.fork(), - ); - op_pool.insert_attester_slashing( - slashing_3.clone().validate(&state, spec).unwrap(), - state.fork(), - ); - op_pool.insert_attester_slashing( - slashing_4.clone().validate(&state, spec).unwrap(), - state.fork(), - ); + op_pool.insert_attester_slashing(slashing_1.clone().validate(&state, spec).unwrap()); + op_pool.insert_attester_slashing(slashing_2.clone().validate(&state, spec).unwrap()); + op_pool.insert_attester_slashing(slashing_3.clone().validate(&state, spec).unwrap()); + op_pool.insert_attester_slashing(slashing_4.clone().validate(&state, spec).unwrap()); let best_slashings = op_pool.get_slashings_and_exits(&state, &harness.spec); assert_eq!(best_slashings.1, vec![slashing_1, slashing_3]); @@ -1374,18 +1345,9 @@ mod release_tests { let a_slashing_3 = harness.make_attester_slashing(vec![5, 6]); op_pool.insert_proposer_slashing(p_slashing.clone().validate(&state, spec).unwrap()); - op_pool.insert_attester_slashing( - a_slashing_1.clone().validate(&state, spec).unwrap(), - state.fork(), - ); - op_pool.insert_attester_slashing( - a_slashing_2.clone().validate(&state, spec).unwrap(), - state.fork(), - ); - op_pool.insert_attester_slashing( - a_slashing_3.clone().validate(&state, spec).unwrap(), - state.fork(), - ); + op_pool.insert_attester_slashing(a_slashing_1.clone().validate(&state, spec).unwrap()); + op_pool.insert_attester_slashing(a_slashing_2.clone().validate(&state, spec).unwrap()); + op_pool.insert_attester_slashing(a_slashing_3.clone().validate(&state, spec).unwrap()); let best_slashings = op_pool.get_slashings_and_exits(&state, &harness.spec); assert_eq!(best_slashings.1, vec![a_slashing_1, a_slashing_3]); @@ -1406,18 +1368,9 @@ mod release_tests { let slashing_2 = harness.make_attester_slashing(vec![5, 6]); let slashing_3 = harness.make_attester_slashing(vec![1, 2, 3]); - op_pool.insert_attester_slashing( - slashing_1.clone().validate(&state, spec).unwrap(), - state.fork(), - ); - op_pool.insert_attester_slashing( - slashing_2.clone().validate(&state, spec).unwrap(), - state.fork(), - ); - op_pool.insert_attester_slashing( - slashing_3.clone().validate(&state, spec).unwrap(), - state.fork(), - ); + op_pool.insert_attester_slashing(slashing_1.clone().validate(&state, spec).unwrap()); + op_pool.insert_attester_slashing(slashing_2.clone().validate(&state, spec).unwrap()); + op_pool.insert_attester_slashing(slashing_3.clone().validate(&state, spec).unwrap()); let best_slashings = op_pool.get_slashings_and_exits(&state, &harness.spec); assert_eq!(best_slashings.1, vec![slashing_1, slashing_3]); @@ -1438,18 +1391,9 @@ mod release_tests { let slashing_2 = harness.make_attester_slashing(vec![4, 5, 6]); let slashing_3 = harness.make_attester_slashing(vec![7, 8]); - op_pool.insert_attester_slashing( - slashing_1.clone().validate(&state, spec).unwrap(), - state.fork(), - ); - op_pool.insert_attester_slashing( - slashing_2.clone().validate(&state, spec).unwrap(), - state.fork(), - ); - op_pool.insert_attester_slashing( - slashing_3.clone().validate(&state, spec).unwrap(), - state.fork(), - ); + op_pool.insert_attester_slashing(slashing_1.clone().validate(&state, spec).unwrap()); + op_pool.insert_attester_slashing(slashing_2.clone().validate(&state, spec).unwrap()); + op_pool.insert_attester_slashing(slashing_3.clone().validate(&state, spec).unwrap()); let best_slashings = op_pool.get_slashings_and_exits(&state, &harness.spec); assert_eq!(best_slashings.1, vec![slashing_2, slashing_3]); @@ -1718,4 +1662,289 @@ mod release_tests { expected_bits ); } + + fn cross_fork_harness() -> (BeaconChainHarness>, ChainSpec) + { + let mut spec = test_spec::(); + + // Give some room to sign surround slashings. + spec.altair_fork_epoch = Some(Epoch::new(3)); + spec.bellatrix_fork_epoch = Some(Epoch::new(6)); + + // To make exits immediately valid. + spec.shard_committee_period = 0; + + let num_validators = 32; + + let harness = get_harness::(num_validators, Some(spec.clone())); + (harness, spec) + } + + /// Test several cross-fork voluntary exits: + /// + /// - phase0 exit (not valid after Bellatrix) + /// - phase0 exit signed with Altair fork version (only valid after Bellatrix) + #[tokio::test] + async fn cross_fork_exits() { + let (harness, spec) = cross_fork_harness::(); + let altair_fork_epoch = spec.altair_fork_epoch.unwrap(); + let bellatrix_fork_epoch = spec.bellatrix_fork_epoch.unwrap(); + let slots_per_epoch = MainnetEthSpec::slots_per_epoch(); + + let op_pool = OperationPool::::new(); + + // Sign an exit in phase0 with a phase0 epoch. + let exit1 = harness.make_voluntary_exit(0, Epoch::new(0)); + + // Advance to Altair. + harness + .extend_to_slot(altair_fork_epoch.start_slot(slots_per_epoch)) + .await; + let altair_head = harness.chain.canonical_head.cached_head().snapshot; + assert_eq!(altair_head.beacon_state.current_epoch(), altair_fork_epoch); + + // Add exit 1 to the op pool during Altair. It's still valid at this point and should be + // returned. + let verified_exit1 = exit1 + .clone() + .validate(&altair_head.beacon_state, &harness.chain.spec) + .unwrap(); + op_pool.insert_voluntary_exit(verified_exit1); + let exits = + op_pool.get_voluntary_exits(&altair_head.beacon_state, |_| true, &harness.chain.spec); + assert!(exits.contains(&exit1)); + assert_eq!(exits.len(), 1); + + // Advance to Bellatrix. + harness + .extend_to_slot(bellatrix_fork_epoch.start_slot(slots_per_epoch)) + .await; + let bellatrix_head = harness.chain.canonical_head.cached_head().snapshot; + assert_eq!( + bellatrix_head.beacon_state.current_epoch(), + bellatrix_fork_epoch + ); + + // Sign an exit with the Altair domain and a phase0 epoch. This is a weird type of exit + // that is valid because after the Bellatrix fork we'll use the Altair fork domain to verify + // all prior epochs. + let exit2 = harness.make_voluntary_exit(2, Epoch::new(0)); + let verified_exit2 = exit2 + .clone() + .validate(&bellatrix_head.beacon_state, &harness.chain.spec) + .unwrap(); + op_pool.insert_voluntary_exit(verified_exit2); + + // Attempting to fetch exit1 now should fail, despite it still being in the pool. + // exit2 should still be valid, because it was signed with the Altair fork domain. + assert_eq!(op_pool.voluntary_exits.read().len(), 2); + let exits = + op_pool.get_voluntary_exits(&bellatrix_head.beacon_state, |_| true, &harness.spec); + assert_eq!(&exits, &[exit2]); + } + + /// Test several cross-fork proposer slashings: + /// + /// - phase0 slashing (not valid after Bellatrix) + /// - Bellatrix signed with Altair fork version (not valid after Bellatrix) + /// - phase0 exit signed with Altair fork version (only valid after Bellatrix) + #[tokio::test] + async fn cross_fork_proposer_slashings() { + let (harness, spec) = cross_fork_harness::(); + let slots_per_epoch = MainnetEthSpec::slots_per_epoch(); + let altair_fork_epoch = spec.altair_fork_epoch.unwrap(); + let bellatrix_fork_epoch = spec.bellatrix_fork_epoch.unwrap(); + let bellatrix_fork_slot = bellatrix_fork_epoch.start_slot(slots_per_epoch); + + let op_pool = OperationPool::::new(); + + // Sign a proposer slashing in phase0 with a phase0 epoch. + let slashing1 = harness.make_proposer_slashing_at_slot(0, Some(Slot::new(1))); + + // Advance to Altair. + harness + .extend_to_slot(altair_fork_epoch.start_slot(slots_per_epoch)) + .await; + let altair_head = harness.chain.canonical_head.cached_head().snapshot; + assert_eq!(altair_head.beacon_state.current_epoch(), altair_fork_epoch); + + // Add slashing1 to the op pool during Altair. It's still valid at this point and should be + // returned. + let verified_slashing1 = slashing1 + .clone() + .validate(&altair_head.beacon_state, &harness.chain.spec) + .unwrap(); + op_pool.insert_proposer_slashing(verified_slashing1); + let (proposer_slashings, _, _) = + op_pool.get_slashings_and_exits(&altair_head.beacon_state, &harness.chain.spec); + assert!(proposer_slashings.contains(&slashing1)); + assert_eq!(proposer_slashings.len(), 1); + + // Sign a proposer slashing with a Bellatrix slot using the Altair fork domain. + // + // This slashing is valid only before the Bellatrix fork epoch. + let slashing2 = harness.make_proposer_slashing_at_slot(1, Some(bellatrix_fork_slot)); + let verified_slashing2 = slashing2 + .clone() + .validate(&altair_head.beacon_state, &harness.chain.spec) + .unwrap(); + op_pool.insert_proposer_slashing(verified_slashing2); + let (proposer_slashings, _, _) = + op_pool.get_slashings_and_exits(&altair_head.beacon_state, &harness.chain.spec); + assert!(proposer_slashings.contains(&slashing1)); + assert!(proposer_slashings.contains(&slashing2)); + assert_eq!(proposer_slashings.len(), 2); + + // Advance to Bellatrix. + harness.extend_to_slot(bellatrix_fork_slot).await; + let bellatrix_head = harness.chain.canonical_head.cached_head().snapshot; + assert_eq!( + bellatrix_head.beacon_state.current_epoch(), + bellatrix_fork_epoch + ); + + // Sign a proposer slashing with the Altair domain and a phase0 slot. This is a weird type + // of slashing that is only valid after the Bellatrix fork because we'll use the Altair fork + // domain to verify all prior epochs. + let slashing3 = harness.make_proposer_slashing_at_slot(2, Some(Slot::new(1))); + let verified_slashing3 = slashing3 + .clone() + .validate(&bellatrix_head.beacon_state, &harness.chain.spec) + .unwrap(); + op_pool.insert_proposer_slashing(verified_slashing3); + + // Attempting to fetch slashing1 now should fail, despite it still being in the pool. + // Likewise slashing2 is also invalid now because it should be signed with the + // Bellatrix fork version. + // slashing3 should still be valid, because it was signed with the Altair fork domain. + assert_eq!(op_pool.proposer_slashings.read().len(), 3); + let (proposer_slashings, _, _) = + op_pool.get_slashings_and_exits(&bellatrix_head.beacon_state, &harness.spec); + assert!(proposer_slashings.contains(&slashing3)); + assert_eq!(proposer_slashings.len(), 1); + } + + /// Test several cross-fork attester slashings: + /// + /// - both target epochs in phase0 (not valid after Bellatrix) + /// - both target epochs in Bellatrix but signed with Altair domain (not valid after Bellatrix) + /// - Altair attestation that surrounds a phase0 attestation (not valid after Bellatrix) + /// - both target epochs in phase0 but signed with Altair domain (only valid after Bellatrix) + #[tokio::test] + async fn cross_fork_attester_slashings() { + let (harness, spec) = cross_fork_harness::(); + let slots_per_epoch = MainnetEthSpec::slots_per_epoch(); + let zero_epoch = Epoch::new(0); + let altair_fork_epoch = spec.altair_fork_epoch.unwrap(); + let bellatrix_fork_epoch = spec.bellatrix_fork_epoch.unwrap(); + let bellatrix_fork_slot = bellatrix_fork_epoch.start_slot(slots_per_epoch); + + let op_pool = OperationPool::::new(); + + // Sign an attester slashing with the phase0 fork version, with both target epochs in phase0. + let slashing1 = harness.make_attester_slashing_with_epochs( + vec![0], + None, + Some(zero_epoch), + None, + Some(zero_epoch), + ); + + // Advance to Altair. + harness + .extend_to_slot(altair_fork_epoch.start_slot(slots_per_epoch)) + .await; + let altair_head = harness.chain.canonical_head.cached_head().snapshot; + assert_eq!(altair_head.beacon_state.current_epoch(), altair_fork_epoch); + + // Add slashing1 to the op pool during Altair. It's still valid at this point and should be + // returned. + let verified_slashing1 = slashing1 + .clone() + .validate(&altair_head.beacon_state, &harness.chain.spec) + .unwrap(); + op_pool.insert_attester_slashing(verified_slashing1); + + // Sign an attester slashing with two Bellatrix epochs using the Altair fork domain. + // + // This slashing is valid only before the Bellatrix fork epoch. + let slashing2 = harness.make_attester_slashing_with_epochs( + vec![1], + None, + Some(bellatrix_fork_epoch), + None, + Some(bellatrix_fork_epoch), + ); + let verified_slashing2 = slashing2 + .clone() + .validate(&altair_head.beacon_state, &harness.chain.spec) + .unwrap(); + op_pool.insert_attester_slashing(verified_slashing2); + let (_, attester_slashings, _) = + op_pool.get_slashings_and_exits(&altair_head.beacon_state, &harness.chain.spec); + assert!(attester_slashings.contains(&slashing1)); + assert!(attester_slashings.contains(&slashing2)); + assert_eq!(attester_slashings.len(), 2); + + // Sign an attester slashing where an Altair attestation surrounds a phase0 one. + // + // This slashing is valid only before the Bellatrix fork epoch. + let slashing3 = harness.make_attester_slashing_with_epochs( + vec![2], + Some(Epoch::new(0)), + Some(altair_fork_epoch), + Some(Epoch::new(1)), + Some(altair_fork_epoch - 1), + ); + let verified_slashing3 = slashing3 + .clone() + .validate(&altair_head.beacon_state, &harness.chain.spec) + .unwrap(); + op_pool.insert_attester_slashing(verified_slashing3); + + // All three slashings should be valid and returned from the pool at this point. + // Seeing as we can only extract 2 at time we'll just pretend that validator 0 is already + // slashed. + let mut to_be_slashed = hashset! {0}; + let attester_slashings = + op_pool.get_attester_slashings(&altair_head.beacon_state, &mut to_be_slashed); + assert!(attester_slashings.contains(&slashing2)); + assert!(attester_slashings.contains(&slashing3)); + assert_eq!(attester_slashings.len(), 2); + + // Advance to Bellatrix. + harness.extend_to_slot(bellatrix_fork_slot).await; + let bellatrix_head = harness.chain.canonical_head.cached_head().snapshot; + assert_eq!( + bellatrix_head.beacon_state.current_epoch(), + bellatrix_fork_epoch + ); + + // Sign an attester slashing with the Altair domain and phase0 epochs. This is a weird type + // of slashing that is only valid after the Bellatrix fork because we'll use the Altair fork + // domain to verify all prior epochs. + let slashing4 = harness.make_attester_slashing_with_epochs( + vec![3], + Some(Epoch::new(0)), + Some(altair_fork_epoch - 1), + Some(Epoch::new(0)), + Some(altair_fork_epoch - 1), + ); + let verified_slashing4 = slashing4 + .clone() + .validate(&bellatrix_head.beacon_state, &harness.chain.spec) + .unwrap(); + op_pool.insert_attester_slashing(verified_slashing4); + + // All slashings except slashing4 are now invalid (despite being present in the pool). + assert_eq!(op_pool.attester_slashings.read().len(), 4); + let (_, attester_slashings, _) = + op_pool.get_slashings_and_exits(&bellatrix_head.beacon_state, &harness.spec); + assert!(attester_slashings.contains(&slashing4)); + assert_eq!(attester_slashings.len(), 1); + + // Pruning the attester slashings should remove all but slashing4. + op_pool.prune_attester_slashings(&bellatrix_head.beacon_state); + assert_eq!(op_pool.attester_slashings.read().len(), 1); + } } diff --git a/beacon_node/operation_pool/src/max_cover.rs b/beacon_node/operation_pool/src/max_cover.rs index 8e50b8152e..2e629f786b 100644 --- a/beacon_node/operation_pool/src/max_cover.rs +++ b/beacon_node/operation_pool/src/max_cover.rs @@ -11,16 +11,21 @@ use itertools::Itertools; pub trait MaxCover: Clone { /// The result type, of which we would eventually like a collection of maximal quality. type Object: Clone; + /// The intermediate object type, which can be converted to `Object`. + type Intermediate: Clone; /// The type used to represent sets. type Set: Clone; - /// Extract an object for inclusion in a solution. - fn object(&self) -> &Self::Object; + /// Extract the intermediate object. + fn intermediate(&self) -> &Self::Intermediate; + + /// Convert the borrowed intermediate object to an owned object for the solution. + fn convert_to_object(intermediate: &Self::Intermediate) -> Self::Object; /// Get the set of elements covered. fn covering_set(&self) -> &Self::Set; /// Update the set of items covered, for the inclusion of some object in the solution. - fn update_covering_set(&mut self, max_obj: &Self::Object, max_set: &Self::Set); + fn update_covering_set(&mut self, max_obj: &Self::Intermediate, max_set: &Self::Set); /// The quality of this item's covering set, usually its cardinality. fn score(&self) -> usize; } @@ -86,7 +91,7 @@ where .filter(|x| x.available && x.item.score() != 0) .for_each(|x| { x.item - .update_covering_set(best.object(), best.covering_set()) + .update_covering_set(best.intermediate(), best.covering_set()) }); result.push(best); @@ -106,7 +111,7 @@ where .into_iter() .merge_by(cover2, |item1, item2| item1.score() >= item2.score()) .take(limit) - .map(|item| item.object().clone()) + .map(|item| T::convert_to_object(item.intermediate())) .collect() } @@ -121,12 +126,17 @@ mod test { T: Clone + Eq + Hash, { type Object = Self; + type Intermediate = Self; type Set = Self; - fn object(&self) -> &Self { + fn intermediate(&self) -> &Self { self } + fn convert_to_object(set: &Self) -> Self { + set.clone() + } + fn covering_set(&self) -> &Self { self } diff --git a/beacon_node/operation_pool/src/metrics.rs b/beacon_node/operation_pool/src/metrics.rs index 3fa5208a3d..6fd8567cef 100644 --- a/beacon_node/operation_pool/src/metrics.rs +++ b/beacon_node/operation_pool/src/metrics.rs @@ -3,6 +3,10 @@ use lazy_static::lazy_static; pub use lighthouse_metrics::*; lazy_static! { + pub static ref BUILD_REWARD_CACHE_TIME: Result = try_create_histogram( + "op_pool_build_reward_cache_time", + "Time to build the reward cache before packing attestations" + ); pub static ref ATTESTATION_PREV_EPOCH_PACKING_TIME: Result = try_create_histogram( "op_pool_attestation_prev_epoch_packing_time", "Time to pack previous epoch attestations" diff --git a/beacon_node/operation_pool/src/persistence.rs b/beacon_node/operation_pool/src/persistence.rs index 0769786097..ed15369df7 100644 --- a/beacon_node/operation_pool/src/persistence.rs +++ b/beacon_node/operation_pool/src/persistence.rs @@ -1,12 +1,13 @@ use crate::attestation_id::AttestationId; +use crate::attestation_storage::AttestationMap; use crate::sync_aggregate_id::SyncAggregateId; use crate::OpPoolError; use crate::OperationPool; use derivative::Derivative; use parking_lot::RwLock; -use serde_derive::{Deserialize, Serialize}; use ssz::{Decode, Encode}; use ssz_derive::{Decode, Encode}; +use state_processing::SigVerifiedOp; use store::{DBColumn, Error as StoreError, StoreItem}; use types::*; @@ -17,32 +18,42 @@ type PersistedSyncContributions = Vec<(SyncAggregateId, Vec { - /// Mapping from attestation ID to attestation mappings. - // We could save space by not storing the attestation ID, but it might - // be difficult to make that roundtrip due to eager aggregation. - attestations: Vec<(AttestationId, Vec>)>, + /// [DEPRECATED] Mapping from attestation ID to attestation mappings. + #[superstruct(only(V5))] + pub attestations_v5: Vec<(AttestationId, Vec>)>, + /// Attestations and their attesting indices. + #[superstruct(only(V12))] + pub attestations: Vec<(Attestation, Vec)>, /// Mapping from sync contribution ID to sync contributions and aggregate. - #[superstruct(only(Altair))] - sync_contributions: PersistedSyncContributions, + pub sync_contributions: PersistedSyncContributions, + /// [DEPRECATED] Attester slashings. + #[superstruct(only(V5))] + pub attester_slashings_v5: Vec<(AttesterSlashing, ForkVersion)>, /// Attester slashings. - attester_slashings: Vec<(AttesterSlashing, ForkVersion)>, - /// Proposer slashings. - proposer_slashings: Vec, - /// Voluntary exits. - voluntary_exits: Vec, + #[superstruct(only(V12))] + pub attester_slashings: Vec, T>>, + /// [DEPRECATED] Proposer slashings. + #[superstruct(only(V5))] + pub proposer_slashings_v5: Vec, + /// Proposer slashings with fork information. + #[superstruct(only(V12))] + pub proposer_slashings: Vec>, + /// [DEPRECATED] Voluntary exits. + #[superstruct(only(V5))] + pub voluntary_exits_v5: Vec, + /// Voluntary exits with fork information. + #[superstruct(only(V12))] + pub voluntary_exits: Vec>, } impl PersistedOperationPool { @@ -52,7 +63,12 @@ impl PersistedOperationPool { .attestations .read() .iter() - .map(|(att_id, att)| (att_id.clone(), att.clone())) + .map(|att| { + ( + att.clone_as_attestation(), + att.indexed.attesting_indices.clone(), + ) + }) .collect(); let sync_contributions = operation_pool @@ -83,7 +99,7 @@ impl PersistedOperationPool { .map(|(_, exit)| exit.clone()) .collect(); - PersistedOperationPool::Altair(PersistedOperationPoolAltair { + PersistedOperationPool::V12(PersistedOperationPoolV12 { attestations, sync_contributions, attester_slashings, @@ -92,45 +108,62 @@ impl PersistedOperationPool { }) } - /// Reconstruct an `OperationPool`. Sets `sync_contributions` to its `Default` if `self` matches - /// `PersistedOperationPool::Base`. + /// Reconstruct an `OperationPool`. pub fn into_operation_pool(self) -> Result, OpPoolError> { - let attestations = RwLock::new(self.attestations().iter().cloned().collect()); - let attester_slashings = RwLock::new(self.attester_slashings().iter().cloned().collect()); + let attester_slashings = RwLock::new(self.attester_slashings()?.iter().cloned().collect()); let proposer_slashings = RwLock::new( - self.proposer_slashings() + self.proposer_slashings()? .iter() .cloned() - .map(|slashing| (slashing.signed_header_1.message.proposer_index, slashing)) + .map(|slashing| (slashing.as_inner().proposer_index(), slashing)) .collect(), ); let voluntary_exits = RwLock::new( - self.voluntary_exits() + self.voluntary_exits()? .iter() .cloned() - .map(|exit| (exit.message.validator_index, exit)) + .map(|exit| (exit.as_inner().message.validator_index, exit)) .collect(), ); - let op_pool = match self { - PersistedOperationPool::Altair(_) => { - let sync_contributions = - RwLock::new(self.sync_contributions()?.iter().cloned().collect()); - - OperationPool { - attestations, - sync_contributions, - attester_slashings, - proposer_slashings, - voluntary_exits, - _phantom: Default::default(), + let sync_contributions = RwLock::new(self.sync_contributions().iter().cloned().collect()); + let attestations = match self { + PersistedOperationPool::V5(_) => return Err(OpPoolError::IncorrectOpPoolVariant), + PersistedOperationPool::V12(pool) => { + let mut map = AttestationMap::default(); + for (att, attesting_indices) in pool.attestations { + map.insert(att, attesting_indices); } + RwLock::new(map) } }; + let op_pool = OperationPool { + attestations, + sync_contributions, + attester_slashings, + proposer_slashings, + voluntary_exits, + reward_cache: Default::default(), + _phantom: Default::default(), + }; Ok(op_pool) } } -/// Deserialization for `PersistedOperationPool` defaults to `PersistedOperationPool::Altair`. +impl StoreItem for PersistedOperationPoolV5 { + fn db_column() -> DBColumn { + DBColumn::OpPool + } + + fn as_store_bytes(&self) -> Vec { + self.as_ssz_bytes() + } + + fn from_store_bytes(bytes: &[u8]) -> Result { + PersistedOperationPoolV5::from_ssz_bytes(bytes).map_err(Into::into) + } +} + +/// Deserialization for `PersistedOperationPool` defaults to `PersistedOperationPool::V12`. impl StoreItem for PersistedOperationPool { fn db_column() -> DBColumn { DBColumn::OpPool @@ -141,9 +174,9 @@ impl StoreItem for PersistedOperationPool { } fn from_store_bytes(bytes: &[u8]) -> Result { - // Default deserialization to the Altair variant. - PersistedOperationPoolAltair::from_ssz_bytes(bytes) - .map(Self::Altair) + // Default deserialization to the latest variant. + PersistedOperationPoolV12::from_ssz_bytes(bytes) + .map(Self::V12) .map_err(Into::into) } } diff --git a/beacon_node/operation_pool/src/reward_cache.rs b/beacon_node/operation_pool/src/reward_cache.rs new file mode 100644 index 0000000000..5b9d4258e9 --- /dev/null +++ b/beacon_node/operation_pool/src/reward_cache.rs @@ -0,0 +1,122 @@ +use crate::OpPoolError; +use bitvec::vec::BitVec; +use types::{BeaconState, BeaconStateError, Epoch, EthSpec, Hash256, ParticipationFlags}; + +#[derive(Debug, PartialEq, Eq, Clone)] +struct Initialization { + current_epoch: Epoch, + latest_block_root: Hash256, +} + +/// Cache to store pre-computed information for block proposal. +#[derive(Debug, Clone, Default)] +pub struct RewardCache { + initialization: Option, + /// `BitVec` of validator indices which don't have default participation flags for the prev. epoch. + /// + /// We choose to only track whether validators have *any* participation flag set because + /// it's impossible to include a new attestation which is better than the existing participation + /// UNLESS the validator makes a slashable attestation, and we assume that this is rare enough + /// that it's acceptable to be slightly sub-optimal in this case. + previous_epoch_participation: BitVec, + /// `BitVec` of validator indices which don't have default participation flags for the current epoch. + current_epoch_participation: BitVec, +} + +impl RewardCache { + pub fn has_attested_in_epoch( + &self, + validator_index: u64, + epoch: Epoch, + ) -> Result { + if let Some(init) = &self.initialization { + if init.current_epoch == epoch { + Ok(*self + .current_epoch_participation + .get(validator_index as usize) + .ok_or(OpPoolError::RewardCacheOutOfBounds)?) + } else if init.current_epoch == epoch + 1 { + Ok(*self + .previous_epoch_participation + .get(validator_index as usize) + .ok_or(OpPoolError::RewardCacheOutOfBounds)?) + } else { + Err(OpPoolError::RewardCacheWrongEpoch) + } + } else { + Err(OpPoolError::RewardCacheWrongEpoch) + } + } + + /// Return the root of the latest block applied to `state`. + /// + /// For simplicity at genesis we return the zero hash, which will cause one unnecessary + /// re-calculation in `update`. + fn latest_block_root(state: &BeaconState) -> Result { + if state.slot() == 0 { + Ok(Hash256::zero()) + } else { + Ok(*state + .get_block_root(state.slot() - 1) + .map_err(OpPoolError::RewardCacheGetBlockRoot)?) + } + } + + /// Update the cache. + pub fn update(&mut self, state: &BeaconState) -> Result<(), OpPoolError> { + if matches!(state, BeaconState::Base(_)) { + return Ok(()); + } + + let current_epoch = state.current_epoch(); + let latest_block_root = Self::latest_block_root(state)?; + + let new_init = Initialization { + current_epoch, + latest_block_root, + }; + + // The participation flags change every block, and will almost always need updating when + // this function is called at a new slot. + if self + .initialization + .as_ref() + .map_or(true, |init| *init != new_init) + { + self.update_previous_epoch_participation(state) + .map_err(OpPoolError::RewardCacheUpdatePrevEpoch)?; + self.update_current_epoch_participation(state) + .map_err(OpPoolError::RewardCacheUpdateCurrEpoch)?; + + self.initialization = Some(new_init); + } + + Ok(()) + } + + fn update_previous_epoch_participation( + &mut self, + state: &BeaconState, + ) -> Result<(), BeaconStateError> { + let default_participation = ParticipationFlags::default(); + self.previous_epoch_participation = state + .previous_epoch_participation()? + .iter() + .map(|participation| *participation != default_participation) + .collect(); + Ok(()) + } + + fn update_current_epoch_participation( + &mut self, + state: &BeaconState, + ) -> Result<(), BeaconStateError> { + let default_participation = ParticipationFlags::default(); + self.current_epoch_participation = state + .current_epoch_participation()? + .iter() + .map(|participation| *participation != default_participation) + .collect(); + Ok(()) + } +} diff --git a/beacon_node/src/cli.rs b/beacon_node/src/cli.rs index bbb904717b..3df95a0a5d 100644 --- a/beacon_node/src/cli.rs +++ b/beacon_node/src/cli.rs @@ -727,6 +727,16 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> { .default_value("250") .takes_value(true) ) + .arg( + Arg::with_name("paranoid-block-proposal") + .long("paranoid-block-proposal") + .help("Paranoid enough to be reading the source? Nice. This flag reverts some \ + block proposal optimisations and forces the node to check every attestation \ + it includes super thoroughly. This may be useful in an emergency, but not \ + otherwise.") + .hidden(true) + .takes_value(false) + ) .arg( Arg::with_name("builder-fallback-skips") .long("builder-fallback-skips") diff --git a/beacon_node/src/config.rs b/beacon_node/src/config.rs index e885275b04..f08981b103 100644 --- a/beacon_node/src/config.rs +++ b/beacon_node/src/config.rs @@ -644,6 +644,8 @@ pub fn get_config( client_config.chain.count_unrealized = clap_utils::parse_required(cli_args, "count-unrealized")?; + client_config.chain.paranoid_block_proposal = cli_args.is_present("paranoid-block-proposal"); + /* * Builder fallback configs. */ diff --git a/beacon_node/store/src/metadata.rs b/beacon_node/store/src/metadata.rs index d72dbcd23d..4f35c4c072 100644 --- a/beacon_node/store/src/metadata.rs +++ b/beacon_node/store/src/metadata.rs @@ -4,7 +4,7 @@ use ssz::{Decode, Encode}; use ssz_derive::{Decode, Encode}; use types::{Checkpoint, Hash256, Slot}; -pub const CURRENT_SCHEMA_VERSION: SchemaVersion = SchemaVersion(11); +pub const CURRENT_SCHEMA_VERSION: SchemaVersion = SchemaVersion(12); // All the keys that get stored under the `BeaconMeta` column. // diff --git a/book/src/database-migrations.md b/book/src/database-migrations.md index de7ced1331..c31e373b48 100644 --- a/book/src/database-migrations.md +++ b/book/src/database-migrations.md @@ -23,6 +23,7 @@ validator client or the slasher**. | v2.4.0 | Jul 2022 | v9 | yes (pre Bellatrix) | | v2.5.0 | Aug 2022 | v11 | yes | | v3.0.0 | Aug 2022 | v11 | yes | +| v3.1.0 | Sep 2022 | v12 | yes | > **Note**: All point releases (e.g. v2.3.1) are schema-compatible with the prior minor release > (e.g. v2.3.0). diff --git a/consensus/state_processing/Cargo.toml b/consensus/state_processing/Cargo.toml index c7ed4b308d..46ac2bae57 100644 --- a/consensus/state_processing/Cargo.toml +++ b/consensus/state_processing/Cargo.toml @@ -14,6 +14,7 @@ bls = { path = "../../crypto/bls" } integer-sqrt = "0.1.5" itertools = "0.10.0" eth2_ssz = "0.4.1" +eth2_ssz_derive = "0.3.0" eth2_ssz_types = "0.2.2" merkle_proof = { path = "../merkle_proof" } safe_arith = { path = "../safe_arith" } @@ -26,6 +27,7 @@ smallvec = "1.6.1" arbitrary = { version = "1.0", features = ["derive"], optional = true } lighthouse_metrics = { path = "../../common/lighthouse_metrics", optional = true } lazy_static = { version = "1.4.0", optional = true } +derivative = "2.1.1" [features] default = ["legacy-arith", "metrics"] diff --git a/consensus/state_processing/src/common/get_attesting_indices.rs b/consensus/state_processing/src/common/get_attesting_indices.rs index fb636f861e..d7d02c3601 100644 --- a/consensus/state_processing/src/common/get_attesting_indices.rs +++ b/consensus/state_processing/src/common/get_attesting_indices.rs @@ -1,12 +1,10 @@ use types::*; /// Returns validator indices which participated in the attestation, sorted by increasing index. -/// -/// Spec v0.12.1 pub fn get_attesting_indices( committee: &[usize], bitlist: &BitList, -) -> Result, BeaconStateError> { +) -> Result, BeaconStateError> { if bitlist.len() != committee.len() { return Err(BeaconStateError::InvalidBitfield); } @@ -15,7 +13,7 @@ pub fn get_attesting_indices( for (i, validator_index) in committee.iter().enumerate() { if let Ok(true) = bitlist.get(i) { - indices.push(*validator_index) + indices.push(*validator_index as u64) } } @@ -23,3 +21,12 @@ pub fn get_attesting_indices( Ok(indices) } + +/// Shortcut for getting the attesting indices while fetching the committee from the state's cache. +pub fn get_attesting_indices_from_state( + state: &BeaconState, + att: &Attestation, +) -> Result, BeaconStateError> { + let committee = state.get_beacon_committee(att.data.slot, att.data.index)?; + get_attesting_indices::(committee.committee, &att.aggregation_bits) +} diff --git a/consensus/state_processing/src/common/get_indexed_attestation.rs b/consensus/state_processing/src/common/get_indexed_attestation.rs index daa1c09307..63f63698e4 100644 --- a/consensus/state_processing/src/common/get_indexed_attestation.rs +++ b/consensus/state_processing/src/common/get_indexed_attestation.rs @@ -14,9 +14,7 @@ pub fn get_indexed_attestation( let attesting_indices = get_attesting_indices::(committee, &attestation.aggregation_bits)?; Ok(IndexedAttestation { - attesting_indices: VariableList::new( - attesting_indices.into_iter().map(|x| x as u64).collect(), - )?, + attesting_indices: VariableList::new(attesting_indices)?, data: attestation.data.clone(), signature: attestation.signature.clone(), }) diff --git a/consensus/state_processing/src/common/mod.rs b/consensus/state_processing/src/common/mod.rs index 334a293ed5..8a2e2439bb 100644 --- a/consensus/state_processing/src/common/mod.rs +++ b/consensus/state_processing/src/common/mod.rs @@ -10,7 +10,7 @@ pub mod base; pub use deposit_data_tree::DepositDataTree; pub use get_attestation_participation::get_attestation_participation_flag_indices; -pub use get_attesting_indices::get_attesting_indices; +pub use get_attesting_indices::{get_attesting_indices, get_attesting_indices_from_state}; pub use get_indexed_attestation::get_indexed_attestation; pub use initiate_validator_exit::initiate_validator_exit; pub use slash_validator::slash_validator; diff --git a/consensus/state_processing/src/per_epoch_processing/base/validator_statuses.rs b/consensus/state_processing/src/per_epoch_processing/base/validator_statuses.rs index b40f91ce5a..26d2536e5f 100644 --- a/consensus/state_processing/src/per_epoch_processing/base/validator_statuses.rs +++ b/consensus/state_processing/src/per_epoch_processing/base/validator_statuses.rs @@ -278,8 +278,8 @@ impl ValidatorStatuses { // Loop through the participating validator indices and update the status vec. for validator_index in attesting_indices { self.statuses - .get_mut(validator_index) - .ok_or(BeaconStateError::UnknownValidator(validator_index))? + .get_mut(validator_index as usize) + .ok_or(BeaconStateError::UnknownValidator(validator_index as usize))? .update(&status); } } diff --git a/consensus/state_processing/src/upgrade/altair.rs b/consensus/state_processing/src/upgrade/altair.rs index 5e4fcbcf55..176f1af15c 100644 --- a/consensus/state_processing/src/upgrade/altair.rs +++ b/consensus/state_processing/src/upgrade/altair.rs @@ -32,8 +32,8 @@ pub fn translate_participation( for index in attesting_indices { for flag_index in &participation_flag_indices { epoch_participation - .get_mut(index) - .ok_or(Error::UnknownValidator(index))? + .get_mut(index as usize) + .ok_or(Error::UnknownValidator(index as usize))? .add_flag(*flag_index)?; } } diff --git a/consensus/state_processing/src/verify_operation.rs b/consensus/state_processing/src/verify_operation.rs index 25c2839edd..80dee28f62 100644 --- a/consensus/state_processing/src/verify_operation.rs +++ b/consensus/state_processing/src/verify_operation.rs @@ -5,36 +5,120 @@ use crate::per_block_processing::{ verify_attester_slashing, verify_exit, verify_proposer_slashing, }; use crate::VerifySignatures; +use derivative::Derivative; +use smallvec::{smallvec, SmallVec}; +use ssz::{Decode, Encode}; +use ssz_derive::{Decode, Encode}; +use std::marker::PhantomData; use types::{ - AttesterSlashing, BeaconState, ChainSpec, EthSpec, ProposerSlashing, SignedVoluntaryExit, + AttesterSlashing, BeaconState, ChainSpec, Epoch, EthSpec, Fork, ForkVersion, ProposerSlashing, + SignedVoluntaryExit, }; +const MAX_FORKS_VERIFIED_AGAINST: usize = 2; + /// Wrapper around an operation type that acts as proof that its signature has been checked. /// -/// The inner field is private, meaning instances of this type can only be constructed +/// The inner `op` field is private, meaning instances of this type can only be constructed /// by calling `validate`. -#[derive(Debug, PartialEq, Eq, Clone)] -pub struct SigVerifiedOp(T); +#[derive(Derivative, Debug, Clone, Encode, Decode)] +#[derivative( + PartialEq, + Eq, + Hash(bound = "T: Encode + Decode + std::hash::Hash, E: EthSpec") +)] +pub struct SigVerifiedOp { + op: T, + verified_against: VerifiedAgainst, + #[ssz(skip_serializing, skip_deserializing)] + _phantom: PhantomData, +} + +/// Information about the fork versions that this message was verified against. +/// +/// In general it is not safe to assume that a `SigVerifiedOp` constructed at some point in the past +/// will continue to be valid in the presence of a changing `state.fork()`. The reason for this +/// is that the fork versions that the message's epochs map to might change. +/// +/// For example a proposer slashing at a phase0 slot verified against an Altair state will use +/// the phase0 fork version, but will become invalid once the Bellatrix fork occurs because that +/// slot will start to map to the Altair fork version. This is because `Fork::get_fork_version` only +/// remembers the most recent two forks. +/// +/// In the other direction, a proposer slashing at a Bellatrix slot verified against an Altair state +/// will use the Altair fork version, but will become invalid once the Bellatrix fork occurs because +/// that slot will start to map to the Bellatrix fork version. +/// +/// We need to store multiple `ForkVersion`s because attester slashings contain two indexed +/// attestations which may be signed using different versions. +#[derive(Debug, PartialEq, Eq, Clone, Hash, Encode, Decode)] +pub struct VerifiedAgainst { + fork_versions: SmallVec<[ForkVersion; MAX_FORKS_VERIFIED_AGAINST]>, +} + +impl SigVerifiedOp +where + T: VerifyOperation, + E: EthSpec, +{ + /// This function must be private because it assumes that `op` has already been verified. + fn new(op: T, state: &BeaconState) -> Self { + let verified_against = VerifiedAgainst { + fork_versions: op + .verification_epochs() + .into_iter() + .map(|epoch| state.fork().get_fork_version(epoch)) + .collect(), + }; + + SigVerifiedOp { + op, + verified_against, + _phantom: PhantomData, + } + } -impl SigVerifiedOp { pub fn into_inner(self) -> T { - self.0 + self.op } pub fn as_inner(&self) -> &T { - &self.0 + &self.op + } + + pub fn signature_is_still_valid(&self, current_fork: &Fork) -> bool { + self.as_inner() + .verification_epochs() + .into_iter() + .zip(self.verified_against.fork_versions.iter()) + .all(|(epoch, verified_fork_version)| { + current_fork.get_fork_version(epoch) == *verified_fork_version + }) + } + + /// Return one of the fork versions this message was verified against. + /// + /// This is only required for the v12 schema downgrade and can be deleted once all nodes + /// are upgraded to v12. + pub fn first_fork_verified_against(&self) -> Option { + self.verified_against.fork_versions.first().copied() } } /// Trait for operations that can be verified and transformed into a `SigVerifiedOp`. -pub trait VerifyOperation: Sized { +pub trait VerifyOperation: Encode + Decode + Sized { type Error; fn validate( self, state: &BeaconState, spec: &ChainSpec, - ) -> Result, Self::Error>; + ) -> Result, Self::Error>; + + /// Return the epochs at which parts of this message were verified. + /// + /// These need to map 1-to-1 to the `SigVerifiedOp::verified_against` for this type. + fn verification_epochs(&self) -> SmallVec<[Epoch; MAX_FORKS_VERIFIED_AGAINST]>; } impl VerifyOperation for SignedVoluntaryExit { @@ -44,9 +128,14 @@ impl VerifyOperation for SignedVoluntaryExit { self, state: &BeaconState, spec: &ChainSpec, - ) -> Result, Self::Error> { + ) -> Result, Self::Error> { verify_exit(state, &self, VerifySignatures::True, spec)?; - Ok(SigVerifiedOp(self)) + Ok(SigVerifiedOp::new(self, state)) + } + + #[allow(clippy::integer_arithmetic)] + fn verification_epochs(&self) -> SmallVec<[Epoch; MAX_FORKS_VERIFIED_AGAINST]> { + smallvec![self.message.epoch] } } @@ -57,9 +146,17 @@ impl VerifyOperation for AttesterSlashing { self, state: &BeaconState, spec: &ChainSpec, - ) -> Result, Self::Error> { + ) -> Result, Self::Error> { verify_attester_slashing(state, &self, VerifySignatures::True, spec)?; - Ok(SigVerifiedOp(self)) + Ok(SigVerifiedOp::new(self, state)) + } + + #[allow(clippy::integer_arithmetic)] + fn verification_epochs(&self) -> SmallVec<[Epoch; MAX_FORKS_VERIFIED_AGAINST]> { + smallvec![ + self.attestation_1.data.target.epoch, + self.attestation_2.data.target.epoch + ] } } @@ -70,8 +167,18 @@ impl VerifyOperation for ProposerSlashing { self, state: &BeaconState, spec: &ChainSpec, - ) -> Result, Self::Error> { + ) -> Result, Self::Error> { verify_proposer_slashing(&self, state, VerifySignatures::True, spec)?; - Ok(SigVerifiedOp(self)) + Ok(SigVerifiedOp::new(self, state)) + } + + #[allow(clippy::integer_arithmetic)] + fn verification_epochs(&self) -> SmallVec<[Epoch; MAX_FORKS_VERIFIED_AGAINST]> { + // Only need a single epoch because the slots of the two headers must be equal. + smallvec![self + .signed_header_1 + .message + .slot + .epoch(E::slots_per_epoch())] } } diff --git a/consensus/types/src/proposer_slashing.rs b/consensus/types/src/proposer_slashing.rs index ff12b0611a..ca048b149a 100644 --- a/consensus/types/src/proposer_slashing.rs +++ b/consensus/types/src/proposer_slashing.rs @@ -18,6 +18,13 @@ pub struct ProposerSlashing { pub signed_header_2: SignedBeaconBlockHeader, } +impl ProposerSlashing { + /// Get proposer index, assuming slashing validity has already been checked. + pub fn proposer_index(&self) -> u64 { + self.signed_header_1.message.proposer_index + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/lighthouse/tests/beacon_node.rs b/lighthouse/tests/beacon_node.rs index 7fd4ad91cf..14934a5669 100644 --- a/lighthouse/tests/beacon_node.rs +++ b/lighthouse/tests/beacon_node.rs @@ -132,6 +132,21 @@ fn fork_choice_before_proposal_timeout_zero() { .with_config(|config| assert_eq!(config.chain.fork_choice_before_proposal_timeout_ms, 0)); } +#[test] +fn paranoid_block_proposal_default() { + CommandLineTest::new() + .run_with_zero_port() + .with_config(|config| assert!(!config.chain.paranoid_block_proposal)); +} + +#[test] +fn paranoid_block_proposal_on() { + CommandLineTest::new() + .flag("paranoid-block-proposal", None) + .run_with_zero_port() + .with_config(|config| assert!(config.chain.paranoid_block_proposal)); +} + #[test] fn count_unrealized_default() { CommandLineTest::new()