From 66eca1a88218462235cb76a116dc3c6a1853444f Mon Sep 17 00:00:00 2001 From: Michael Sproul Date: Mon, 29 Aug 2022 09:10:26 +0000 Subject: [PATCH] Refactor op pool for speed and correctness (#3312) ## Proposed Changes This PR has two aims: to speed up attestation packing in the op pool, and to fix bugs in the verification of attester slashings, proposer slashings and voluntary exits. The changes are bundled into a single database schema upgrade (v12). Attestation packing is sped up by removing several inefficiencies: - No more recalculation of `attesting_indices` during packing. - No (unnecessary) examination of the `ParticipationFlags`: a bitfield suffices. See `RewardCache`. - No re-checking of attestation validity during packing: the `AttestationMap` provides attestations which are "correct by construction" (I have checked this using Hydra). - No SSZ re-serialization for the clunky `AttestationId` type (it can be removed in a future release). So far the speed-up seems to be roughly 2-10x, from 500ms down to 50-100ms. Verification of attester slashings, proposer slashings and voluntary exits is fixed by: - Tracking the `ForkVersion`s that were used to verify each message inside the `SigVerifiedOp`. This allows us to quickly re-verify that they match the head state's opinion of what the `ForkVersion` should be at the epoch(s) relevant to the message. - Storing the `SigVerifiedOp` on disk rather than the raw operation. This allows us to continue track the fork versions after a reboot. This is mostly contained in this commit 52bb1840ae5c4356a8fc3a51e5df23ed65ed2c7f. ## Additional Info The schema upgrade uses the justified state to re-verify attestations and compute `attesting_indices` for them. It will drop any attestations that fail to verify, by the logic that attestations are most valuable in the few slots after they're observed, and are probably stale and useless by the time a node restarts. Exits and proposer slashings and similarly re-verified to obtain `SigVerifiedOp`s. This PR contains a runtime killswitch `--paranoid-block-proposal` which opts out of all the optimisations in favour of closely verifying every included message. Although I'm quite sure that the optimisations are correct this flag could be useful in the event of an unforeseen emergency. Finally, you might notice that the `RewardCache` appears quite useless in its current form because it is only updated on the hot-path immediately before proposal. My hope is that in future we can shift calls to `RewardCache::update` into the background, e.g. while performing the state advance. It is also forward-looking to `tree-states` compatibility, where iterating and indexing `state.{previous,current}_epoch_participation` is expensive and needs to be minimised. --- Cargo.lock | 4 + .../src/attestation_verification.rs | 9 +- beacon_node/beacon_chain/src/beacon_chain.rs | 153 +++- beacon_node/beacon_chain/src/block_reward.rs | 33 +- .../beacon_chain/src/block_verification.rs | 10 +- beacon_node/beacon_chain/src/chain_config.rs | 3 + .../beacon_chain/src/observed_operations.rs | 34 +- beacon_node/beacon_chain/src/schema_change.rs | 11 + .../src/schema_change/migration_schema_v12.rs | 226 +++++ beacon_node/beacon_chain/src/test_utils.rs | 33 +- beacon_node/beacon_chain/tests/store_tests.rs | 4 +- beacon_node/http_api/src/block_rewards.rs | 11 +- beacon_node/http_api/src/lib.rs | 34 +- .../beacon_processor/worker/gossip_methods.rs | 15 +- beacon_node/operation_pool/Cargo.toml | 2 + beacon_node/operation_pool/src/attestation.rs | 74 +- .../operation_pool/src/attestation_id.rs | 37 +- .../operation_pool/src/attestation_storage.rs | 245 ++++++ .../operation_pool/src/attester_slashing.rs | 8 +- beacon_node/operation_pool/src/lib.rs | 811 +++++++++++------- beacon_node/operation_pool/src/max_cover.rs | 22 +- beacon_node/operation_pool/src/metrics.rs | 4 + beacon_node/operation_pool/src/persistence.rs | 121 ++- .../operation_pool/src/reward_cache.rs | 122 +++ beacon_node/src/cli.rs | 10 + beacon_node/src/config.rs | 2 + beacon_node/store/src/metadata.rs | 2 +- book/src/database-migrations.md | 1 + consensus/state_processing/Cargo.toml | 2 + .../src/common/get_attesting_indices.rs | 15 +- .../src/common/get_indexed_attestation.rs | 4 +- consensus/state_processing/src/common/mod.rs | 2 +- .../base/validator_statuses.rs | 4 +- .../state_processing/src/upgrade/altair.rs | 4 +- .../state_processing/src/verify_operation.rs | 137 ++- consensus/types/src/proposer_slashing.rs | 7 + lighthouse/tests/beacon_node.rs | 15 + 37 files changed, 1710 insertions(+), 521 deletions(-) create mode 100644 beacon_node/beacon_chain/src/schema_change/migration_schema_v12.rs create mode 100644 beacon_node/operation_pool/src/attestation_storage.rs create mode 100644 beacon_node/operation_pool/src/reward_cache.rs diff --git a/Cargo.lock b/Cargo.lock index 5a2c4312b1..46cd2d96f0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4410,12 +4410,14 @@ name = "operation_pool" version = "0.2.0" dependencies = [ "beacon_chain", + "bitvec 1.0.1", "derivative", "eth2_ssz", "eth2_ssz_derive", "itertools", "lazy_static", "lighthouse_metrics", + "maplit", "parking_lot 0.12.1", "rayon", "serde", @@ -6271,9 +6273,11 @@ dependencies = [ "arbitrary", "beacon_chain", "bls", + "derivative", "env_logger 0.9.0", "eth2_hashing", "eth2_ssz", + "eth2_ssz_derive", "eth2_ssz_types", "int_to_bytes", "integer-sqrt", diff --git a/beacon_node/beacon_chain/src/attestation_verification.rs b/beacon_node/beacon_chain/src/attestation_verification.rs index 63af6ab9e1..b60ce7efe5 100644 --- a/beacon_node/beacon_chain/src/attestation_verification.rs +++ b/beacon_node/beacon_chain/src/attestation_verification.rs @@ -318,10 +318,17 @@ impl<'a, T: BeaconChainTypes> Clone for IndexedUnaggregatedAttestation<'a, T> { /// A helper trait implemented on wrapper types that can be progressed to a state where they can be /// verified for application to fork choice. -pub trait VerifiedAttestation { +pub trait VerifiedAttestation: Sized { fn attestation(&self) -> &Attestation; fn indexed_attestation(&self) -> &IndexedAttestation; + + // Inefficient default implementation. This is overridden for gossip verified attestations. + fn into_attestation_and_indices(self) -> (Attestation, Vec) { + let attestation = self.attestation().clone(); + let attesting_indices = self.indexed_attestation().attesting_indices.clone().into(); + (attestation, attesting_indices) + } } impl<'a, T: BeaconChainTypes> VerifiedAttestation for VerifiedAggregatedAttestation<'a, T> { diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 4d37926dd9..fdcd3eed88 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -63,7 +63,7 @@ use fork_choice::{ use futures::channel::mpsc::Sender; use itertools::process_results; use itertools::Itertools; -use operation_pool::{OperationPool, PersistedOperationPool}; +use operation_pool::{AttestationRef, OperationPool, PersistedOperationPool}; use parking_lot::{Mutex, RwLock}; use safe_arith::SafeArith; use slasher::Slasher; @@ -71,12 +71,15 @@ use slog::{crit, debug, error, info, trace, warn, Logger}; use slot_clock::SlotClock; use ssz::Encode; use state_processing::{ - common::get_indexed_attestation, + common::{get_attesting_indices_from_state, get_indexed_attestation}, per_block_processing, - per_block_processing::errors::AttestationValidationError, + per_block_processing::{ + errors::AttestationValidationError, verify_attestation_for_block_inclusion, + VerifySignatures, + }, per_slot_processing, state_advance::{complete_state_advance, partial_state_advance}, - BlockSignatureStrategy, SigVerifiedOp, VerifyBlockRoot, + BlockSignatureStrategy, SigVerifiedOp, VerifyBlockRoot, VerifyOperation, }; use std::cmp::Ordering; use std::collections::HashMap; @@ -1904,25 +1907,22 @@ impl BeaconChain { /// Accepts a `VerifiedAttestation` and attempts to apply it to `self.op_pool`. /// /// The op pool is used by local block producers to pack blocks with operations. - pub fn add_to_block_inclusion_pool( + pub fn add_to_block_inclusion_pool( &self, - verified_attestation: &impl VerifiedAttestation, - ) -> Result<(), AttestationError> { + verified_attestation: A, + ) -> Result<(), AttestationError> + where + A: VerifiedAttestation, + { let _timer = metrics::start_timer(&metrics::ATTESTATION_PROCESSING_APPLY_TO_OP_POOL); // If there's no eth1 chain then it's impossible to produce blocks and therefore // useless to put things in the op pool. if self.eth1_chain.is_some() { - let fork = self.canonical_head.cached_head().head_fork(); - + let (attestation, attesting_indices) = + verified_attestation.into_attestation_and_indices(); self.op_pool - .insert_attestation( - // TODO: address this clone. - verified_attestation.attestation().clone(), - &fork, - self.genesis_validators_root, - &self.spec, - ) + .insert_attestation(attestation, attesting_indices) .map_err(Error::from)?; } @@ -1955,15 +1955,15 @@ impl BeaconChain { pub fn filter_op_pool_attestation( &self, filter_cache: &mut HashMap<(Hash256, Epoch), bool>, - att: &Attestation, + att: &AttestationRef, state: &BeaconState, ) -> bool { *filter_cache - .entry((att.data.beacon_block_root, att.data.target.epoch)) + .entry((att.data.beacon_block_root, att.checkpoint.target_epoch)) .or_insert_with(|| { self.shuffling_is_compatible( &att.data.beacon_block_root, - att.data.target.epoch, + att.checkpoint.target_epoch, state, ) }) @@ -2045,7 +2045,7 @@ impl BeaconChain { pub fn verify_voluntary_exit_for_gossip( &self, exit: SignedVoluntaryExit, - ) -> Result, Error> { + ) -> Result, Error> { // NOTE: this could be more efficient if it avoided cloning the head state let wall_clock_state = self.wall_clock_state()?; Ok(self @@ -2066,7 +2066,7 @@ impl BeaconChain { } /// Accept a pre-verified exit and queue it for inclusion in an appropriate block. - pub fn import_voluntary_exit(&self, exit: SigVerifiedOp) { + pub fn import_voluntary_exit(&self, exit: SigVerifiedOp) { if self.eth1_chain.is_some() { self.op_pool.insert_voluntary_exit(exit) } @@ -2076,7 +2076,7 @@ impl BeaconChain { pub fn verify_proposer_slashing_for_gossip( &self, proposer_slashing: ProposerSlashing, - ) -> Result, Error> { + ) -> Result, Error> { let wall_clock_state = self.wall_clock_state()?; Ok(self.observed_proposer_slashings.lock().verify_and_observe( proposer_slashing, @@ -2086,7 +2086,10 @@ impl BeaconChain { } /// Accept some proposer slashing and queue it for inclusion in an appropriate block. - pub fn import_proposer_slashing(&self, proposer_slashing: SigVerifiedOp) { + pub fn import_proposer_slashing( + &self, + proposer_slashing: SigVerifiedOp, + ) { if self.eth1_chain.is_some() { self.op_pool.insert_proposer_slashing(proposer_slashing) } @@ -2096,7 +2099,7 @@ impl BeaconChain { pub fn verify_attester_slashing_for_gossip( &self, attester_slashing: AttesterSlashing, - ) -> Result>, Error> { + ) -> Result, T::EthSpec>, Error> { let wall_clock_state = self.wall_clock_state()?; Ok(self.observed_attester_slashings.lock().verify_and_observe( attester_slashing, @@ -2111,7 +2114,7 @@ impl BeaconChain { /// 2. Add it to the op pool. pub fn import_attester_slashing( &self, - attester_slashing: SigVerifiedOp>, + attester_slashing: SigVerifiedOp, T::EthSpec>, ) { // Add to fork choice. self.canonical_head @@ -2120,10 +2123,7 @@ impl BeaconChain { // Add to the op pool (if we have the ability to propose blocks). if self.eth1_chain.is_some() { - self.op_pool.insert_attester_slashing( - attester_slashing, - self.canonical_head.cached_head().head_fork(), - ) + self.op_pool.insert_attester_slashing(attester_slashing) } } @@ -3351,7 +3351,7 @@ impl BeaconChain { } }; - let (proposer_slashings, attester_slashings, voluntary_exits) = + let (mut proposer_slashings, mut attester_slashings, mut voluntary_exits) = self.op_pool.get_slashings_and_exits(&state, &self.spec); let eth1_data = eth1_chain.eth1_data_for_block_production(&state, &self.spec)?; @@ -3362,12 +3362,12 @@ impl BeaconChain { let unagg_import_timer = metrics::start_timer(&metrics::BLOCK_PRODUCTION_UNAGGREGATED_TIMES); for attestation in self.naive_aggregation_pool.read().iter() { - if let Err(e) = self.op_pool.insert_attestation( - attestation.clone(), - &state.fork(), - state.genesis_validators_root(), - &self.spec, - ) { + let import = |attestation: &Attestation| { + let attesting_indices = get_attesting_indices_from_state(&state, attestation)?; + self.op_pool + .insert_attestation(attestation.clone(), attesting_indices) + }; + if let Err(e) = import(attestation) { // Don't stop block production if there's an error, just create a log. error!( self.log, @@ -3388,15 +3388,15 @@ impl BeaconChain { metrics::start_timer(&metrics::BLOCK_PRODUCTION_ATTESTATION_TIMES); let mut prev_filter_cache = HashMap::new(); - let prev_attestation_filter = |att: &&Attestation| { - self.filter_op_pool_attestation(&mut prev_filter_cache, *att, &state) + let prev_attestation_filter = |att: &AttestationRef| { + self.filter_op_pool_attestation(&mut prev_filter_cache, att, &state) }; let mut curr_filter_cache = HashMap::new(); - let curr_attestation_filter = |att: &&Attestation| { - self.filter_op_pool_attestation(&mut curr_filter_cache, *att, &state) + let curr_attestation_filter = |att: &AttestationRef| { + self.filter_op_pool_attestation(&mut curr_filter_cache, att, &state) }; - let attestations = self + let mut attestations = self .op_pool .get_attestations( &state, @@ -3407,6 +3407,77 @@ impl BeaconChain { .map_err(BlockProductionError::OpPoolError)?; drop(attestation_packing_timer); + // If paranoid mode is enabled re-check the signatures of every included message. + // This will be a lot slower but guards against bugs in block production and can be + // quickly rolled out without a release. + if self.config.paranoid_block_proposal { + attestations.retain(|att| { + verify_attestation_for_block_inclusion( + &state, + att, + VerifySignatures::True, + &self.spec, + ) + .map_err(|e| { + warn!( + self.log, + "Attempted to include an invalid attestation"; + "err" => ?e, + "block_slot" => state.slot(), + "attestation" => ?att + ); + }) + .is_ok() + }); + + proposer_slashings.retain(|slashing| { + slashing + .clone() + .validate(&state, &self.spec) + .map_err(|e| { + warn!( + self.log, + "Attempted to include an invalid proposer slashing"; + "err" => ?e, + "block_slot" => state.slot(), + "slashing" => ?slashing + ); + }) + .is_ok() + }); + + attester_slashings.retain(|slashing| { + slashing + .clone() + .validate(&state, &self.spec) + .map_err(|e| { + warn!( + self.log, + "Attempted to include an invalid attester slashing"; + "err" => ?e, + "block_slot" => state.slot(), + "slashing" => ?slashing + ); + }) + .is_ok() + }); + + voluntary_exits.retain(|exit| { + exit.clone() + .validate(&state, &self.spec) + .map_err(|e| { + warn!( + self.log, + "Attempted to include an invalid proposer slashing"; + "err" => ?e, + "block_slot" => state.slot(), + "exit" => ?exit + ); + }) + .is_ok() + }); + } + let slot = state.slot(); let proposer_index = state.get_beacon_proposer_index(state.slot(), &self.spec)? as u64; diff --git a/beacon_node/beacon_chain/src/block_reward.rs b/beacon_node/beacon_chain/src/block_reward.rs index 4b8b809d3f..3bddd2a521 100644 --- a/beacon_node/beacon_chain/src/block_reward.rs +++ b/beacon_node/beacon_chain/src/block_reward.rs @@ -1,7 +1,10 @@ use crate::{BeaconChain, BeaconChainError, BeaconChainTypes}; use eth2::lighthouse::{AttestationRewards, BlockReward, BlockRewardMeta}; -use operation_pool::{AttMaxCover, MaxCover}; -use state_processing::per_block_processing::altair::sync_committee::compute_sync_aggregate_rewards; +use operation_pool::{AttMaxCover, MaxCover, RewardCache, SplitAttestation}; +use state_processing::{ + common::get_attesting_indices_from_state, + per_block_processing::altair::sync_committee::compute_sync_aggregate_rewards, +}; use types::{BeaconBlockRef, BeaconState, EthSpec, ExecPayload, Hash256}; impl BeaconChain { @@ -10,20 +13,38 @@ impl BeaconChain { block: BeaconBlockRef<'_, T::EthSpec, Payload>, block_root: Hash256, state: &BeaconState, + reward_cache: &mut RewardCache, include_attestations: bool, ) -> Result { if block.slot() != state.slot() { return Err(BeaconChainError::BlockRewardSlotError); } + reward_cache.update(state)?; + let total_active_balance = state.get_total_active_balance()?; - let mut per_attestation_rewards = block + + let split_attestations = block .body() .attestations() .iter() .map(|att| { - AttMaxCover::new(att, state, total_active_balance, &self.spec) - .ok_or(BeaconChainError::BlockRewardAttestationError) + let attesting_indices = get_attesting_indices_from_state(state, att)?; + Ok(SplitAttestation::new(att.clone(), attesting_indices)) + }) + .collect::, BeaconChainError>>()?; + + let mut per_attestation_rewards = split_attestations + .iter() + .map(|att| { + AttMaxCover::new( + att.as_ref(), + state, + reward_cache, + total_active_balance, + &self.spec, + ) + .ok_or(BeaconChainError::BlockRewardAttestationError) }) .collect::, _>>()?; @@ -34,7 +55,7 @@ impl BeaconChain { let latest_att = &updated[i]; for att in to_update { - att.update_covering_set(latest_att.object(), latest_att.covering_set()); + att.update_covering_set(latest_att.intermediate(), latest_att.covering_set()); } } diff --git a/beacon_node/beacon_chain/src/block_verification.rs b/beacon_node/beacon_chain/src/block_verification.rs index 95d5f818f0..cdcbf3f68e 100644 --- a/beacon_node/beacon_chain/src/block_verification.rs +++ b/beacon_node/beacon_chain/src/block_verification.rs @@ -1307,8 +1307,14 @@ impl ExecutionPendingBlock { */ if let Some(ref event_handler) = chain.event_handler { if event_handler.has_block_reward_subscribers() { - let block_reward = - chain.compute_block_reward(block.message(), block_root, &state, true)?; + let mut reward_cache = Default::default(); + let block_reward = chain.compute_block_reward( + block.message(), + block_root, + &state, + &mut reward_cache, + true, + )?; event_handler.register(EventKind::BlockReward(block_reward)); } } diff --git a/beacon_node/beacon_chain/src/chain_config.rs b/beacon_node/beacon_chain/src/chain_config.rs index aa7ff02af1..ba3a0b628c 100644 --- a/beacon_node/beacon_chain/src/chain_config.rs +++ b/beacon_node/beacon_chain/src/chain_config.rs @@ -35,6 +35,8 @@ pub struct ChainConfig { /// Whether any chain health checks should be considered when deciding whether to use the builder API. pub builder_fallback_disable_checks: bool, pub count_unrealized: bool, + /// Whether to apply paranoid checks to blocks proposed by this beacon node. + pub paranoid_block_proposal: bool, } impl Default for ChainConfig { @@ -52,6 +54,7 @@ impl Default for ChainConfig { builder_fallback_epochs_since_finalization: 3, builder_fallback_disable_checks: false, count_unrealized: true, + paranoid_block_proposal: false, } } } diff --git a/beacon_node/beacon_chain/src/observed_operations.rs b/beacon_node/beacon_chain/src/observed_operations.rs index f1eb996a54..8d8272b67d 100644 --- a/beacon_node/beacon_chain/src/observed_operations.rs +++ b/beacon_node/beacon_chain/src/observed_operations.rs @@ -1,10 +1,12 @@ use derivative::Derivative; use smallvec::SmallVec; +use ssz::{Decode, Encode}; use state_processing::{SigVerifiedOp, VerifyOperation}; use std::collections::HashSet; use std::marker::PhantomData; use types::{ - AttesterSlashing, BeaconState, ChainSpec, EthSpec, ProposerSlashing, SignedVoluntaryExit, + AttesterSlashing, BeaconState, ChainSpec, EthSpec, ForkName, ProposerSlashing, + SignedVoluntaryExit, Slot, }; /// Number of validator indices to store on the stack in `observed_validators`. @@ -24,13 +26,16 @@ pub struct ObservedOperations, E: EthSpec> { /// previously seen attester slashings, i.e. those validators in the intersection of /// `attestation_1.attester_indices` and `attestation_2.attester_indices`. observed_validator_indices: HashSet, + /// The name of the current fork. The default will be overwritten on first use. + #[derivative(Default(value = "ForkName::Base"))] + current_fork: ForkName, _phantom: PhantomData<(T, E)>, } /// Was the observed operation new and valid for further processing, or a useless duplicate? #[derive(Debug, PartialEq, Eq, Clone)] -pub enum ObservationOutcome { - New(SigVerifiedOp), +pub enum ObservationOutcome { + New(SigVerifiedOp), AlreadyKnown, } @@ -81,7 +86,9 @@ impl, E: EthSpec> ObservedOperations { op: T, head_state: &BeaconState, spec: &ChainSpec, - ) -> Result, T::Error> { + ) -> Result, T::Error> { + self.reset_at_fork_boundary(head_state.slot(), spec); + let observed_validator_indices = &mut self.observed_validator_indices; let new_validator_indices = op.observed_validators(); @@ -107,4 +114,23 @@ impl, E: EthSpec> ObservedOperations { Ok(ObservationOutcome::New(verified_op)) } + + /// Reset the cache when crossing a fork boundary. + /// + /// This prevents an attacker from crafting a self-slashing which is only valid before the fork + /// (e.g. using the Altair fork domain at a Bellatrix epoch), in order to prevent propagation of + /// all other slashings due to the duplicate check. + /// + /// It doesn't matter if this cache gets reset too often, as we reset it on restart anyway and a + /// false negative just results in propagation of messages which should have been ignored. + /// + /// In future we could check slashing relevance against the op pool itself, but that would + /// require indexing the attester slashings in the op pool by validator index. + fn reset_at_fork_boundary(&mut self, head_slot: Slot, spec: &ChainSpec) { + let head_fork = spec.fork_name_at_slot::(head_slot); + if head_fork != self.current_fork { + self.observed_validator_indices.clear(); + self.current_fork = head_fork; + } + } } diff --git a/beacon_node/beacon_chain/src/schema_change.rs b/beacon_node/beacon_chain/src/schema_change.rs index b6c70b5435..15b0f39f3a 100644 --- a/beacon_node/beacon_chain/src/schema_change.rs +++ b/beacon_node/beacon_chain/src/schema_change.rs @@ -1,6 +1,7 @@ //! Utilities for managing database schema changes. mod migration_schema_v10; mod migration_schema_v11; +mod migration_schema_v12; mod migration_schema_v6; mod migration_schema_v7; mod migration_schema_v8; @@ -196,6 +197,16 @@ pub fn migrate_schema( Ok(()) } + // Upgrade from v11 to v12 to store richer metadata in the attestation op pool. + (SchemaVersion(11), SchemaVersion(12)) => { + let ops = migration_schema_v12::upgrade_to_v12::(db.clone(), log)?; + db.store_schema_version_atomically(to, ops) + } + // Downgrade from v12 to v11 to drop richer metadata from the attestation op pool. + (SchemaVersion(12), SchemaVersion(11)) => { + let ops = migration_schema_v12::downgrade_from_v12::(db.clone(), log)?; + db.store_schema_version_atomically(to, ops) + } // Anything else is an error. (_, _) => Err(HotColdDBError::UnsupportedSchemaVersion { target_version: to, diff --git a/beacon_node/beacon_chain/src/schema_change/migration_schema_v12.rs b/beacon_node/beacon_chain/src/schema_change/migration_schema_v12.rs new file mode 100644 index 0000000000..bb72b28c0e --- /dev/null +++ b/beacon_node/beacon_chain/src/schema_change/migration_schema_v12.rs @@ -0,0 +1,226 @@ +use crate::beacon_chain::{BeaconChainTypes, FORK_CHOICE_DB_KEY, OP_POOL_DB_KEY}; +use crate::persisted_fork_choice::PersistedForkChoiceV11; +use operation_pool::{PersistedOperationPool, PersistedOperationPoolV12, PersistedOperationPoolV5}; +use slog::{debug, info, Logger}; +use state_processing::{ + common::get_indexed_attestation, per_block_processing::is_valid_indexed_attestation, + VerifyOperation, VerifySignatures, +}; +use std::sync::Arc; +use store::{Error, HotColdDB, KeyValueStoreOp, StoreItem}; + +pub fn upgrade_to_v12( + db: Arc>, + log: Logger, +) -> Result, Error> { + let spec = db.get_chain_spec(); + + // Load a V5 op pool and transform it to V12. + let PersistedOperationPoolV5 { + attestations_v5, + sync_contributions, + attester_slashings_v5, + proposer_slashings_v5, + voluntary_exits_v5, + } = if let Some(op_pool) = db.get_item(&OP_POOL_DB_KEY)? { + op_pool + } else { + debug!(log, "Nothing to do, no operation pool stored"); + return Ok(vec![]); + }; + + // Load the persisted fork choice so we can grab the state of the justified block and use + // it to verify the stored attestations, slashings and exits. + let fork_choice = db + .get_item::(&FORK_CHOICE_DB_KEY)? + .ok_or_else(|| Error::SchemaMigrationError("fork choice missing from database".into()))?; + let justified_block_root = fork_choice + .fork_choice_store + .unrealized_justified_checkpoint + .root; + let justified_block = db + .get_blinded_block(&justified_block_root)? + .ok_or_else(|| { + Error::SchemaMigrationError(format!( + "unrealized justified block missing for migration: {justified_block_root:?}", + )) + })?; + let justified_state_root = justified_block.state_root(); + let mut state = db + .get_state(&justified_state_root, Some(justified_block.slot()))? + .ok_or_else(|| { + Error::SchemaMigrationError(format!( + "justified state missing for migration: {justified_state_root:?}" + )) + })?; + state.build_all_committee_caches(spec).map_err(|e| { + Error::SchemaMigrationError(format!("unable to build committee caches: {e:?}")) + })?; + + // Re-verify attestations while adding attesting indices. + let attestations = attestations_v5 + .into_iter() + .flat_map(|(_, attestations)| attestations) + .filter_map(|attestation| { + let res = state + .get_beacon_committee(attestation.data.slot, attestation.data.index) + .map_err(Into::into) + .and_then(|committee| get_indexed_attestation(committee.committee, &attestation)) + .and_then(|indexed_attestation| { + is_valid_indexed_attestation( + &state, + &indexed_attestation, + VerifySignatures::True, + spec, + )?; + Ok(indexed_attestation) + }); + + match res { + Ok(indexed) => Some((attestation, indexed.attesting_indices.into())), + Err(e) => { + debug!( + log, + "Dropping attestation on migration"; + "err" => ?e, + "head_block" => ?attestation.data.beacon_block_root, + ); + None + } + } + }) + .collect::>(); + + let attester_slashings = attester_slashings_v5 + .iter() + .filter_map(|(slashing, _)| { + slashing + .clone() + .validate(&state, spec) + .map_err(|e| { + debug!( + log, + "Dropping attester slashing on migration"; + "err" => ?e, + "slashing" => ?slashing, + ); + }) + .ok() + }) + .collect::>(); + + let proposer_slashings = proposer_slashings_v5 + .iter() + .filter_map(|slashing| { + slashing + .clone() + .validate(&state, spec) + .map_err(|e| { + debug!( + log, + "Dropping proposer slashing on migration"; + "err" => ?e, + "slashing" => ?slashing, + ); + }) + .ok() + }) + .collect::>(); + + let voluntary_exits = voluntary_exits_v5 + .iter() + .filter_map(|exit| { + exit.clone() + .validate(&state, spec) + .map_err(|e| { + debug!( + log, + "Dropping voluntary exit on migration"; + "err" => ?e, + "exit" => ?exit, + ); + }) + .ok() + }) + .collect::>(); + + debug!( + log, + "Migrated op pool"; + "attestations" => attestations.len(), + "attester_slashings" => attester_slashings.len(), + "proposer_slashings" => proposer_slashings.len(), + "voluntary_exits" => voluntary_exits.len() + ); + + let v12 = PersistedOperationPool::V12(PersistedOperationPoolV12 { + attestations, + sync_contributions, + attester_slashings, + proposer_slashings, + voluntary_exits, + }); + Ok(vec![v12.as_kv_store_op(OP_POOL_DB_KEY)]) +} + +pub fn downgrade_from_v12( + db: Arc>, + log: Logger, +) -> Result, Error> { + // Load a V12 op pool and transform it to V5. + let PersistedOperationPoolV12 { + attestations, + sync_contributions, + attester_slashings, + proposer_slashings, + voluntary_exits, + } = if let Some(PersistedOperationPool::::V12(op_pool)) = + db.get_item(&OP_POOL_DB_KEY)? + { + op_pool + } else { + debug!(log, "Nothing to do, no operation pool stored"); + return Ok(vec![]); + }; + + info!( + log, + "Dropping attestations from pool"; + "count" => attestations.len(), + ); + + let attester_slashings_v5 = attester_slashings + .into_iter() + .filter_map(|slashing| { + let fork_version = slashing.first_fork_verified_against()?; + Some((slashing.into_inner(), fork_version)) + }) + .collect::>(); + + let proposer_slashings_v5 = proposer_slashings + .into_iter() + .map(|slashing| slashing.into_inner()) + .collect::>(); + + let voluntary_exits_v5 = voluntary_exits + .into_iter() + .map(|exit| exit.into_inner()) + .collect::>(); + + info!( + log, + "Migrated slashings and exits"; + "attester_slashings" => attester_slashings_v5.len(), + "proposer_slashings" => proposer_slashings_v5.len(), + "voluntary_exits" => voluntary_exits_v5.len(), + ); + + let v5 = PersistedOperationPoolV5 { + attestations_v5: vec![], + sync_contributions, + attester_slashings_v5, + proposer_slashings_v5, + voluntary_exits_v5, + }; + Ok(vec![v5.as_kv_store_op(OP_POOL_DB_KEY)]) +} diff --git a/beacon_node/beacon_chain/src/test_utils.rs b/beacon_node/beacon_chain/src/test_utils.rs index 9b62590703..a62608202e 100644 --- a/beacon_node/beacon_chain/src/test_utils.rs +++ b/beacon_node/beacon_chain/src/test_utils.rs @@ -1175,6 +1175,19 @@ where } pub fn make_attester_slashing(&self, validator_indices: Vec) -> AttesterSlashing { + self.make_attester_slashing_with_epochs(validator_indices, None, None, None, None) + } + + pub fn make_attester_slashing_with_epochs( + &self, + validator_indices: Vec, + source1: Option, + target1: Option, + source2: Option, + target2: Option, + ) -> AttesterSlashing { + let fork = self.chain.canonical_head.cached_head().head_fork(); + let mut attestation_1 = IndexedAttestation { attesting_indices: VariableList::new(validator_indices).unwrap(), data: AttestationData { @@ -1183,11 +1196,11 @@ where beacon_block_root: Hash256::zero(), target: Checkpoint { root: Hash256::zero(), - epoch: Epoch::new(0), + epoch: target1.unwrap_or(fork.epoch), }, source: Checkpoint { root: Hash256::zero(), - epoch: Epoch::new(0), + epoch: source1.unwrap_or(Epoch::new(0)), }, }, signature: AggregateSignature::infinity(), @@ -1195,8 +1208,9 @@ where let mut attestation_2 = attestation_1.clone(); attestation_2.data.index += 1; + attestation_2.data.source.epoch = source2.unwrap_or(Epoch::new(0)); + attestation_2.data.target.epoch = target2.unwrap_or(fork.epoch); - let fork = self.chain.canonical_head.cached_head().head_fork(); for attestation in &mut [&mut attestation_1, &mut attestation_2] { for &i in &attestation.attesting_indices { let sk = &self.validator_keypairs[i as usize].sk; @@ -1280,8 +1294,19 @@ where } pub fn make_proposer_slashing(&self, validator_index: u64) -> ProposerSlashing { + self.make_proposer_slashing_at_slot(validator_index, None) + } + + pub fn make_proposer_slashing_at_slot( + &self, + validator_index: u64, + slot_override: Option, + ) -> ProposerSlashing { let mut block_header_1 = self.chain.head_beacon_block().message().block_header(); block_header_1.proposer_index = validator_index; + if let Some(slot) = slot_override { + block_header_1.slot = slot; + } let mut block_header_2 = block_header_1.clone(); block_header_2.state_root = Hash256::zero(); @@ -1488,7 +1513,7 @@ where self.chain .apply_attestation_to_fork_choice(&verified) .unwrap(); - self.chain.add_to_block_inclusion_pool(&verified).unwrap(); + self.chain.add_to_block_inclusion_pool(verified).unwrap(); } } diff --git a/beacon_node/beacon_chain/tests/store_tests.rs b/beacon_node/beacon_chain/tests/store_tests.rs index d9d5ca20d7..afd97750a6 100644 --- a/beacon_node/beacon_chain/tests/store_tests.rs +++ b/beacon_node/beacon_chain/tests/store_tests.rs @@ -795,9 +795,7 @@ async fn multiple_attestations_per_block() { snapshot .beacon_block .as_ref() - .clone() - .deconstruct() - .0 + .message() .body() .attestations() .len() as u64, diff --git a/beacon_node/http_api/src/block_rewards.rs b/beacon_node/http_api/src/block_rewards.rs index 682828aee4..3b81b894db 100644 --- a/beacon_node/http_api/src/block_rewards.rs +++ b/beacon_node/http_api/src/block_rewards.rs @@ -52,6 +52,7 @@ pub fn get_block_rewards( .build_all_caches(&chain.spec) .map_err(beacon_state_error)?; + let mut reward_cache = Default::default(); let mut block_rewards = Vec::with_capacity(blocks.len()); let block_replayer = BlockReplayer::new(state, &chain.spec) @@ -63,6 +64,7 @@ pub fn get_block_rewards( block.message(), block.canonical_root(), state, + &mut reward_cache, query.include_attestations, )?; block_rewards.push(block_reward); @@ -100,6 +102,7 @@ pub fn compute_block_rewards( ) -> Result, warp::Rejection> { let mut block_rewards = Vec::with_capacity(blocks.len()); let mut state_cache = LruCache::new(STATE_CACHE_SIZE); + let mut reward_cache = Default::default(); for block in blocks { let parent_root = block.parent_root(); @@ -170,7 +173,13 @@ pub fn compute_block_rewards( // Compute block reward. let block_reward = chain - .compute_block_reward(block.to_ref(), block.canonical_root(), state, true) + .compute_block_reward( + block.to_ref(), + block.canonical_root(), + state, + &mut reward_cache, + true, + ) .map_err(beacon_chain_error)?; block_rewards.push(block_reward); } diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index 59e6554aee..48178f4f0d 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -45,11 +45,12 @@ use std::sync::Arc; use tokio::sync::mpsc::UnboundedSender; use tokio_stream::{wrappers::BroadcastStream, StreamExt}; use types::{ - Attestation, AttesterSlashing, BeaconStateError, BlindedPayload, CommitteeCache, - ConfigAndPreset, Epoch, EthSpec, ForkName, FullPayload, ProposerPreparationData, - ProposerSlashing, RelativeEpoch, Signature, SignedAggregateAndProof, SignedBeaconBlock, - SignedBlindedBeaconBlock, SignedContributionAndProof, SignedValidatorRegistrationData, - SignedVoluntaryExit, Slot, SyncCommitteeMessage, SyncContributionData, + Attestation, AttestationData, AttesterSlashing, BeaconStateError, BlindedPayload, + CommitteeCache, ConfigAndPreset, Epoch, EthSpec, ForkName, FullPayload, + ProposerPreparationData, ProposerSlashing, RelativeEpoch, Signature, SignedAggregateAndProof, + SignedBeaconBlock, SignedBlindedBeaconBlock, SignedContributionAndProof, + SignedValidatorRegistrationData, SignedVoluntaryExit, Slot, SyncCommitteeMessage, + SyncContributionData, }; use version::{ add_consensus_version_header, execution_optimistic_fork_versioned_response, @@ -1305,13 +1306,11 @@ pub fn serve( .and_then( |chain: Arc>, query: api_types::AttestationPoolQuery| { blocking_json_task(move || { - let query_filter = |attestation: &Attestation| { - query - .slot - .map_or(true, |slot| slot == attestation.data.slot) + let query_filter = |data: &AttestationData| { + query.slot.map_or(true, |slot| slot == data.slot) && query .committee_index - .map_or(true, |index| index == attestation.data.index) + .map_or(true, |index| index == data.index) }; let mut attestations = chain.op_pool.get_filtered_attestations(query_filter); @@ -1321,7 +1320,7 @@ pub fn serve( .read() .iter() .cloned() - .filter(query_filter), + .filter(|att| query_filter(&att.data)), ); Ok(api_types::GenericResponse::from(attestations)) }) @@ -2317,12 +2316,13 @@ pub fn serve( ); failures.push(api_types::Failure::new(index, format!("Fork choice: {:?}", e))); } - if let Err(e) = chain.add_to_block_inclusion_pool(&verified_aggregate) { - warn!(log, - "Could not add verified aggregate attestation to the inclusion pool"; - "error" => format!("{:?}", e), - "request_index" => index, - ); + if let Err(e) = chain.add_to_block_inclusion_pool(verified_aggregate) { + warn!( + log, + "Could not add verified aggregate attestation to the inclusion pool"; + "error" => ?e, + "request_index" => index, + ); failures.push(api_types::Failure::new(index, format!("Op pool: {:?}", e))); } } diff --git a/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs b/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs index e6625e43f8..93ed1b463b 100644 --- a/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs @@ -54,6 +54,12 @@ impl VerifiedAttestation for VerifiedUnaggregate { fn indexed_attestation(&self) -> &IndexedAttestation { &self.indexed_attestation } + + fn into_attestation_and_indices(self) -> (Attestation, Vec) { + let attestation = *self.attestation; + let attesting_indices = self.indexed_attestation.attesting_indices.into(); + (attestation, attesting_indices) + } } /// An attestation that failed validation by the `BeaconChain`. @@ -81,6 +87,13 @@ impl VerifiedAttestation for VerifiedAggregate { fn indexed_attestation(&self) -> &IndexedAttestation { &self.indexed_attestation } + + /// Efficient clone-free implementation that moves out of the `Box`. + fn into_attestation_and_indices(self) -> (Attestation, Vec) { + let attestation = self.signed_aggregate.message.aggregate; + let attesting_indices = self.indexed_attestation.attesting_indices.into(); + (attestation, attesting_indices) + } } /// An attestation that failed validation by the `BeaconChain`. @@ -595,7 +608,7 @@ impl Worker { } } - if let Err(e) = self.chain.add_to_block_inclusion_pool(&verified_aggregate) { + if let Err(e) = self.chain.add_to_block_inclusion_pool(verified_aggregate) { debug!( self.log, "Attestation invalid for op pool"; diff --git a/beacon_node/operation_pool/Cargo.toml b/beacon_node/operation_pool/Cargo.toml index 6b8b8eb145..1d67ecdccc 100644 --- a/beacon_node/operation_pool/Cargo.toml +++ b/beacon_node/operation_pool/Cargo.toml @@ -18,7 +18,9 @@ rayon = "1.5.0" serde = "1.0.116" serde_derive = "1.0.116" store = { path = "../store" } +bitvec = "1" [dev-dependencies] beacon_chain = { path = "../beacon_chain" } tokio = { version = "1.14.0", features = ["rt-multi-thread"] } +maplit = "1.0.2" diff --git a/beacon_node/operation_pool/src/attestation.rs b/beacon_node/operation_pool/src/attestation.rs index 2f7fba4540..4af4edc0e4 100644 --- a/beacon_node/operation_pool/src/attestation.rs +++ b/beacon_node/operation_pool/src/attestation.rs @@ -1,4 +1,6 @@ +use crate::attestation_storage::AttestationRef; use crate::max_cover::MaxCover; +use crate::reward_cache::RewardCache; use state_processing::common::{ altair, base, get_attestation_participation_flag_indices, get_attesting_indices, }; @@ -12,34 +14,35 @@ use types::{ #[derive(Debug, Clone)] pub struct AttMaxCover<'a, T: EthSpec> { /// Underlying attestation. - pub att: &'a Attestation, + pub att: AttestationRef<'a, T>, /// Mapping of validator indices and their rewards. pub fresh_validators_rewards: HashMap, } impl<'a, T: EthSpec> AttMaxCover<'a, T> { pub fn new( - att: &'a Attestation, + att: AttestationRef<'a, T>, state: &BeaconState, + reward_cache: &'a RewardCache, total_active_balance: u64, spec: &ChainSpec, ) -> Option { if let BeaconState::Base(ref base_state) = state { Self::new_for_base(att, state, base_state, total_active_balance, spec) } else { - Self::new_for_altair(att, state, total_active_balance, spec) + Self::new_for_altair(att, state, reward_cache, total_active_balance, spec) } } /// Initialise an attestation cover object for base/phase0 hard fork. pub fn new_for_base( - att: &'a Attestation, + att: AttestationRef<'a, T>, state: &BeaconState, base_state: &BeaconStateBase, total_active_balance: u64, spec: &ChainSpec, ) -> Option { - let fresh_validators = earliest_attestation_validators(att, state, base_state); + let fresh_validators = earliest_attestation_validators(&att, state, base_state); let committee = state .get_beacon_committee(att.data.slot, att.data.index) .ok()?; @@ -67,45 +70,41 @@ impl<'a, T: EthSpec> AttMaxCover<'a, T> { /// Initialise an attestation cover object for Altair or later. pub fn new_for_altair( - att: &'a Attestation, + att: AttestationRef<'a, T>, state: &BeaconState, + reward_cache: &'a RewardCache, total_active_balance: u64, spec: &ChainSpec, ) -> Option { - let committee = state - .get_beacon_committee(att.data.slot, att.data.index) - .ok()?; - let attesting_indices = - get_attesting_indices::(committee.committee, &att.aggregation_bits).ok()?; + let att_data = att.attestation_data(); - let participation_list = if att.data.target.epoch == state.current_epoch() { - state.current_epoch_participation().ok()? - } else if att.data.target.epoch == state.previous_epoch() { - state.previous_epoch_participation().ok()? - } else { - return None; - }; - - let inclusion_delay = state.slot().as_u64().checked_sub(att.data.slot.as_u64())?; + let inclusion_delay = state.slot().as_u64().checked_sub(att_data.slot.as_u64())?; let att_participation_flags = - get_attestation_participation_flag_indices(state, &att.data, inclusion_delay, spec) + get_attestation_participation_flag_indices(state, &att_data, inclusion_delay, spec) .ok()?; let base_reward_per_increment = altair::BaseRewardPerIncrement::new(total_active_balance, spec).ok()?; - let fresh_validators_rewards = attesting_indices + let fresh_validators_rewards = att + .indexed + .attesting_indices .iter() .filter_map(|&index| { + if reward_cache + .has_attested_in_epoch(index, att_data.target.epoch) + .ok()? + { + return None; + } + let mut proposer_reward_numerator = 0; - let participation = participation_list.get(index)?; let base_reward = - altair::get_base_reward(state, index, base_reward_per_increment, spec).ok()?; + altair::get_base_reward(state, index as usize, base_reward_per_increment, spec) + .ok()?; for (flag_index, weight) in PARTICIPATION_FLAG_WEIGHTS.iter().enumerate() { - if att_participation_flags.contains(&flag_index) - && !participation.has_flag(flag_index).ok()? - { + if att_participation_flags.contains(&flag_index) { proposer_reward_numerator += base_reward.checked_mul(*weight)?; } } @@ -113,7 +112,7 @@ impl<'a, T: EthSpec> AttMaxCover<'a, T> { let proposer_reward = proposer_reward_numerator .checked_div(WEIGHT_DENOMINATOR.checked_mul(spec.proposer_reward_quotient)?)?; - Some((index as u64, proposer_reward)).filter(|_| proposer_reward != 0) + Some((index, proposer_reward)).filter(|_| proposer_reward != 0) }) .collect(); @@ -126,10 +125,15 @@ impl<'a, T: EthSpec> AttMaxCover<'a, T> { impl<'a, T: EthSpec> MaxCover for AttMaxCover<'a, T> { type Object = Attestation; + type Intermediate = AttestationRef<'a, T>; type Set = HashMap; - fn object(&self) -> &Attestation { - self.att + fn intermediate(&self) -> &AttestationRef<'a, T> { + &self.att + } + + fn convert_to_object(att_ref: &AttestationRef<'a, T>) -> Attestation { + att_ref.clone_as_attestation() } fn covering_set(&self) -> &HashMap { @@ -148,7 +152,7 @@ impl<'a, T: EthSpec> MaxCover for AttMaxCover<'a, T> { /// of slashable voting, which is rare. fn update_covering_set( &mut self, - best_att: &Attestation, + best_att: &AttestationRef<'a, T>, covered_validators: &HashMap, ) { if self.att.data.slot == best_att.data.slot && self.att.data.index == best_att.data.index { @@ -172,16 +176,16 @@ impl<'a, T: EthSpec> MaxCover for AttMaxCover<'a, T> { /// /// This isn't optimal, but with the Altair fork this code is obsolete and not worth upgrading. pub fn earliest_attestation_validators( - attestation: &Attestation, + attestation: &AttestationRef, state: &BeaconState, base_state: &BeaconStateBase, ) -> BitList { // Bitfield of validators whose attestations are new/fresh. - let mut new_validators = attestation.aggregation_bits.clone(); + let mut new_validators = attestation.indexed.aggregation_bits.clone(); - let state_attestations = if attestation.data.target.epoch == state.current_epoch() { + let state_attestations = if attestation.checkpoint.target_epoch == state.current_epoch() { &base_state.current_epoch_attestations - } else if attestation.data.target.epoch == state.previous_epoch() { + } else if attestation.checkpoint.target_epoch == state.previous_epoch() { &base_state.previous_epoch_attestations } else { return BitList::with_capacity(0).unwrap(); diff --git a/beacon_node/operation_pool/src/attestation_id.rs b/beacon_node/operation_pool/src/attestation_id.rs index f496ecb3a3..b65975787e 100644 --- a/beacon_node/operation_pool/src/attestation_id.rs +++ b/beacon_node/operation_pool/src/attestation_id.rs @@ -1,45 +1,12 @@ use serde_derive::{Deserialize, Serialize}; -use ssz::ssz_encode; use ssz_derive::{Decode, Encode}; -use types::{AttestationData, ChainSpec, Domain, Epoch, Fork, Hash256}; /// Serialized `AttestationData` augmented with a domain to encode the fork info. +/// +/// [DEPRECATED] To be removed once all nodes have updated to schema v12. #[derive( PartialEq, Eq, Clone, Hash, Debug, PartialOrd, Ord, Encode, Decode, Serialize, Deserialize, )] pub struct AttestationId { v: Vec, } - -/// Number of domain bytes that the end of an attestation ID is padded with. -const DOMAIN_BYTES_LEN: usize = std::mem::size_of::(); - -impl AttestationId { - pub fn from_data( - attestation: &AttestationData, - fork: &Fork, - genesis_validators_root: Hash256, - spec: &ChainSpec, - ) -> Self { - let mut bytes = ssz_encode(attestation); - let epoch = attestation.target.epoch; - bytes.extend_from_slice( - AttestationId::compute_domain_bytes(epoch, fork, genesis_validators_root, spec) - .as_bytes(), - ); - AttestationId { v: bytes } - } - - pub fn compute_domain_bytes( - epoch: Epoch, - fork: &Fork, - genesis_validators_root: Hash256, - spec: &ChainSpec, - ) -> Hash256 { - spec.get_domain(epoch, Domain::BeaconAttester, fork, genesis_validators_root) - } - - pub fn domain_bytes_match(&self, domain_bytes: &Hash256) -> bool { - &self.v[self.v.len() - DOMAIN_BYTES_LEN..] == domain_bytes.as_bytes() - } -} diff --git a/beacon_node/operation_pool/src/attestation_storage.rs b/beacon_node/operation_pool/src/attestation_storage.rs new file mode 100644 index 0000000000..0fb9bafd82 --- /dev/null +++ b/beacon_node/operation_pool/src/attestation_storage.rs @@ -0,0 +1,245 @@ +use crate::AttestationStats; +use itertools::Itertools; +use std::collections::HashMap; +use types::{ + AggregateSignature, Attestation, AttestationData, BeaconState, BitList, Checkpoint, Epoch, + EthSpec, Hash256, Slot, +}; + +#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy)] +pub struct CheckpointKey { + pub source: Checkpoint, + pub target_epoch: Epoch, +} + +#[derive(Debug, PartialEq, Eq, Hash)] +pub struct CompactAttestationData { + pub slot: Slot, + pub index: u64, + pub beacon_block_root: Hash256, + pub target_root: Hash256, +} + +#[derive(Debug, PartialEq)] +pub struct CompactIndexedAttestation { + pub attesting_indices: Vec, + pub aggregation_bits: BitList, + pub signature: AggregateSignature, +} + +#[derive(Debug)] +pub struct SplitAttestation { + pub checkpoint: CheckpointKey, + pub data: CompactAttestationData, + pub indexed: CompactIndexedAttestation, +} + +#[derive(Debug, Clone)] +pub struct AttestationRef<'a, T: EthSpec> { + pub checkpoint: &'a CheckpointKey, + pub data: &'a CompactAttestationData, + pub indexed: &'a CompactIndexedAttestation, +} + +#[derive(Debug, Default, PartialEq)] +pub struct AttestationMap { + checkpoint_map: HashMap>, +} + +#[derive(Debug, Default, PartialEq)] +pub struct AttestationDataMap { + attestations: HashMap>>, +} + +impl SplitAttestation { + pub fn new(attestation: Attestation, attesting_indices: Vec) -> Self { + let checkpoint = CheckpointKey { + source: attestation.data.source, + target_epoch: attestation.data.target.epoch, + }; + let data = CompactAttestationData { + slot: attestation.data.slot, + index: attestation.data.index, + beacon_block_root: attestation.data.beacon_block_root, + target_root: attestation.data.target.root, + }; + let indexed = CompactIndexedAttestation { + attesting_indices, + aggregation_bits: attestation.aggregation_bits, + signature: attestation.signature, + }; + Self { + checkpoint, + data, + indexed, + } + } + + pub fn as_ref(&self) -> AttestationRef { + AttestationRef { + checkpoint: &self.checkpoint, + data: &self.data, + indexed: &self.indexed, + } + } +} + +impl<'a, T: EthSpec> AttestationRef<'a, T> { + pub fn attestation_data(&self) -> AttestationData { + AttestationData { + slot: self.data.slot, + index: self.data.index, + beacon_block_root: self.data.beacon_block_root, + source: self.checkpoint.source, + target: Checkpoint { + epoch: self.checkpoint.target_epoch, + root: self.data.target_root, + }, + } + } + + pub fn clone_as_attestation(&self) -> Attestation { + Attestation { + aggregation_bits: self.indexed.aggregation_bits.clone(), + data: self.attestation_data(), + signature: self.indexed.signature.clone(), + } + } +} + +impl CheckpointKey { + /// Return two checkpoint keys: `(previous, current)` for the previous and current epochs of + /// the `state`. + pub fn keys_for_state(state: &BeaconState) -> (Self, Self) { + ( + CheckpointKey { + source: state.previous_justified_checkpoint(), + target_epoch: state.previous_epoch(), + }, + CheckpointKey { + source: state.current_justified_checkpoint(), + target_epoch: state.current_epoch(), + }, + ) + } +} + +impl CompactIndexedAttestation { + pub fn signers_disjoint_from(&self, other: &Self) -> bool { + self.aggregation_bits + .intersection(&other.aggregation_bits) + .is_zero() + } + + pub fn aggregate(&mut self, other: &Self) { + self.attesting_indices = self + .attesting_indices + .drain(..) + .merge(other.attesting_indices.iter().copied()) + .dedup() + .collect(); + self.aggregation_bits = self.aggregation_bits.union(&other.aggregation_bits); + self.signature.add_assign_aggregate(&other.signature); + } +} + +impl AttestationMap { + pub fn insert(&mut self, attestation: Attestation, attesting_indices: Vec) { + let SplitAttestation { + checkpoint, + data, + indexed, + } = SplitAttestation::new(attestation, attesting_indices); + + let attestation_map = self + .checkpoint_map + .entry(checkpoint) + .or_insert_with(AttestationDataMap::default); + let attestations = attestation_map + .attestations + .entry(data) + .or_insert_with(Vec::new); + + // Greedily aggregate the attestation with all existing attestations. + // NOTE: this is sub-optimal and in future we will remove this in favour of max-clique + // aggregation. + let mut aggregated = false; + for existing_attestation in attestations.iter_mut() { + if existing_attestation.signers_disjoint_from(&indexed) { + existing_attestation.aggregate(&indexed); + aggregated = true; + } else if *existing_attestation == indexed { + aggregated = true; + } + } + + if !aggregated { + attestations.push(indexed); + } + } + + /// Iterate all attestations matching the given `checkpoint_key`. + pub fn get_attestations<'a>( + &'a self, + checkpoint_key: &'a CheckpointKey, + ) -> impl Iterator> + 'a { + self.checkpoint_map + .get(checkpoint_key) + .into_iter() + .flat_map(|attestation_map| attestation_map.iter(checkpoint_key)) + } + + /// Iterate all attestations in the map. + pub fn iter(&self) -> impl Iterator> { + self.checkpoint_map + .iter() + .flat_map(|(checkpoint_key, attestation_map)| attestation_map.iter(checkpoint_key)) + } + + /// Prune attestations that are from before the previous epoch. + pub fn prune(&mut self, current_epoch: Epoch) { + self.checkpoint_map + .retain(|checkpoint_key, _| current_epoch <= checkpoint_key.target_epoch + 1); + } + + /// Statistics about all attestations stored in the map. + pub fn stats(&self) -> AttestationStats { + self.checkpoint_map + .values() + .map(AttestationDataMap::stats) + .fold(AttestationStats::default(), |mut acc, new| { + acc.num_attestations += new.num_attestations; + acc.num_attestation_data += new.num_attestation_data; + acc.max_aggregates_per_data = + std::cmp::max(acc.max_aggregates_per_data, new.max_aggregates_per_data); + acc + }) + } +} + +impl AttestationDataMap { + pub fn iter<'a>( + &'a self, + checkpoint_key: &'a CheckpointKey, + ) -> impl Iterator> + 'a { + self.attestations.iter().flat_map(|(data, vec_indexed)| { + vec_indexed.iter().map(|indexed| AttestationRef { + checkpoint: checkpoint_key, + data, + indexed, + }) + }) + } + + pub fn stats(&self) -> AttestationStats { + let mut stats = AttestationStats::default(); + + for aggregates in self.attestations.values() { + stats.num_attestations += aggregates.len(); + stats.num_attestation_data += 1; + stats.max_aggregates_per_data = + std::cmp::max(stats.max_aggregates_per_data, aggregates.len()); + } + stats + } +} diff --git a/beacon_node/operation_pool/src/attester_slashing.rs b/beacon_node/operation_pool/src/attester_slashing.rs index 2cb63ad252..f5916384d4 100644 --- a/beacon_node/operation_pool/src/attester_slashing.rs +++ b/beacon_node/operation_pool/src/attester_slashing.rs @@ -39,14 +39,18 @@ impl<'a, T: EthSpec> AttesterSlashingMaxCover<'a, T> { impl<'a, T: EthSpec> MaxCover for AttesterSlashingMaxCover<'a, T> { /// The result type, of which we would eventually like a collection of maximal quality. type Object = AttesterSlashing; + type Intermediate = AttesterSlashing; /// The type used to represent sets. type Set = HashMap; - /// Extract an object for inclusion in a solution. - fn object(&self) -> &AttesterSlashing { + fn intermediate(&self) -> &AttesterSlashing { self.slashing } + fn convert_to_object(slashing: &AttesterSlashing) -> AttesterSlashing { + slashing.clone() + } + /// Get the set of elements covered. fn covering_set(&self) -> &HashMap { &self.effective_balances diff --git a/beacon_node/operation_pool/src/lib.rs b/beacon_node/operation_pool/src/lib.rs index 771dca12f6..8c335189c6 100644 --- a/beacon_node/operation_pool/src/lib.rs +++ b/beacon_node/operation_pool/src/lib.rs @@ -1,34 +1,38 @@ mod attestation; mod attestation_id; +mod attestation_storage; mod attester_slashing; mod max_cover; mod metrics; mod persistence; +mod reward_cache; mod sync_aggregate_id; pub use attestation::AttMaxCover; +pub use attestation_storage::{AttestationRef, SplitAttestation}; pub use max_cover::MaxCover; -pub use persistence::{PersistedOperationPool, PersistedOperationPoolAltair}; +pub use persistence::{ + PersistedOperationPool, PersistedOperationPoolV12, PersistedOperationPoolV5, +}; +pub use reward_cache::RewardCache; +use crate::attestation_storage::{AttestationMap, CheckpointKey}; use crate::sync_aggregate_id::SyncAggregateId; -use attestation_id::AttestationId; use attester_slashing::AttesterSlashingMaxCover; use max_cover::maximum_cover; -use parking_lot::RwLock; +use parking_lot::{RwLock, RwLockWriteGuard}; use state_processing::per_block_processing::errors::AttestationValidationError; use state_processing::per_block_processing::{ - get_slashable_indices_modular, verify_attestation_for_block_inclusion, verify_exit, - VerifySignatures, + get_slashable_indices_modular, verify_exit, VerifySignatures, }; -use state_processing::SigVerifiedOp; +use state_processing::{SigVerifiedOp, VerifyOperation}; use std::collections::{hash_map::Entry, HashMap, HashSet}; use std::marker::PhantomData; use std::ptr; use types::{ - sync_aggregate::Error as SyncAggregateError, typenum::Unsigned, Attestation, AttesterSlashing, - BeaconState, BeaconStateError, ChainSpec, Epoch, EthSpec, Fork, ForkVersion, Hash256, - ProposerSlashing, SignedVoluntaryExit, Slot, SyncAggregate, SyncCommitteeContribution, - Validator, + sync_aggregate::Error as SyncAggregateError, typenum::Unsigned, Attestation, AttestationData, + AttesterSlashing, BeaconState, BeaconStateError, ChainSpec, Epoch, EthSpec, ProposerSlashing, + SignedVoluntaryExit, Slot, SyncAggregate, SyncCommitteeContribution, Validator, }; type SyncContributions = RwLock>>>; @@ -36,15 +40,17 @@ type SyncContributions = RwLock { /// Map from attestation ID (see below) to vectors of attestations. - attestations: RwLock>>>, + attestations: RwLock>, /// Map from sync aggregate ID to the best `SyncCommitteeContribution`s seen for that ID. sync_contributions: SyncContributions, /// Set of attester slashings, and the fork version they were verified against. - attester_slashings: RwLock, ForkVersion)>>, + attester_slashings: RwLock, T>>>, /// Map from proposer index to slashing. - proposer_slashings: RwLock>, + proposer_slashings: RwLock>>, /// Map from exiting validator to their exit data. - voluntary_exits: RwLock>, + voluntary_exits: RwLock>>, + /// Reward cache for accelerating attestation packing. + reward_cache: RwLock, _phantom: PhantomData, } @@ -53,9 +59,16 @@ pub enum OpPoolError { GetAttestationsTotalBalanceError(BeaconStateError), GetBlockRootError(BeaconStateError), SyncAggregateError(SyncAggregateError), + RewardCacheUpdatePrevEpoch(BeaconStateError), + RewardCacheUpdateCurrEpoch(BeaconStateError), + RewardCacheGetBlockRoot(BeaconStateError), + RewardCacheWrongEpoch, + RewardCacheValidatorUnknown(BeaconStateError), + RewardCacheOutOfBounds, IncorrectOpPoolVariant, } +#[derive(Default)] pub struct AttestationStats { /// Total number of attestations for all committeees/indices/votes. pub num_attestations: usize, @@ -176,95 +189,45 @@ impl OperationPool { pub fn insert_attestation( &self, attestation: Attestation, - fork: &Fork, - genesis_validators_root: Hash256, - spec: &ChainSpec, + attesting_indices: Vec, ) -> Result<(), AttestationValidationError> { - let id = AttestationId::from_data(&attestation.data, fork, genesis_validators_root, spec); - - // Take a write lock on the attestations map. - let mut attestations = self.attestations.write(); - - let existing_attestations = match attestations.entry(id) { - Entry::Vacant(entry) => { - entry.insert(vec![attestation]); - return Ok(()); - } - Entry::Occupied(entry) => entry.into_mut(), - }; - - let mut aggregated = false; - for existing_attestation in existing_attestations.iter_mut() { - if existing_attestation.signers_disjoint_from(&attestation) { - existing_attestation.aggregate(&attestation); - aggregated = true; - } else if *existing_attestation == attestation { - aggregated = true; - } - } - - if !aggregated { - existing_attestations.push(attestation); - } - + self.attestations + .write() + .insert(attestation, attesting_indices); Ok(()) } /// Total number of attestations in the pool, including attestations for the same data. pub fn num_attestations(&self) -> usize { - self.attestations.read().values().map(Vec::len).sum() + self.attestation_stats().num_attestations } pub fn attestation_stats(&self) -> AttestationStats { - let mut num_attestations = 0; - let mut num_attestation_data = 0; - let mut max_aggregates_per_data = 0; - - for aggregates in self.attestations.read().values() { - num_attestations += aggregates.len(); - num_attestation_data += 1; - max_aggregates_per_data = std::cmp::max(max_aggregates_per_data, aggregates.len()); - } - AttestationStats { - num_attestations, - num_attestation_data, - max_aggregates_per_data, - } + self.attestations.read().stats() } /// Return all valid attestations for the given epoch, for use in max cover. + #[allow(clippy::too_many_arguments)] fn get_valid_attestations_for_epoch<'a>( &'a self, - epoch: Epoch, - all_attestations: &'a HashMap>>, + checkpoint_key: &'a CheckpointKey, + all_attestations: &'a AttestationMap, state: &'a BeaconState, + reward_cache: &'a RewardCache, total_active_balance: u64, - validity_filter: impl FnMut(&&Attestation) -> bool + Send, + validity_filter: impl FnMut(&AttestationRef<'a, T>) -> bool + Send, spec: &'a ChainSpec, ) -> impl Iterator> + Send { - let domain_bytes = AttestationId::compute_domain_bytes( - epoch, - &state.fork(), - state.genesis_validators_root(), - spec, - ); all_attestations - .iter() - .filter(move |(key, _)| key.domain_bytes_match(&domain_bytes)) - .flat_map(|(_, attestations)| attestations) - .filter(move |attestation| attestation.data.target.epoch == epoch) - .filter(move |attestation| { - // Ensure attestations are valid for block inclusion - verify_attestation_for_block_inclusion( - state, - attestation, - VerifySignatures::False, - spec, - ) - .is_ok() + .get_attestations(checkpoint_key) + .filter(|att| { + att.data.slot + spec.min_attestation_inclusion_delay <= state.slot() + && state.slot() <= att.data.slot + T::slots_per_epoch() }) .filter(validity_filter) - .filter_map(move |att| AttMaxCover::new(att, state, total_active_balance, spec)) + .filter_map(move |att| { + AttMaxCover::new(att, state, reward_cache, total_active_balance, spec) + }) } /// Get a list of attestations for inclusion in a block. @@ -276,18 +239,24 @@ impl OperationPool { pub fn get_attestations( &self, state: &BeaconState, - prev_epoch_validity_filter: impl FnMut(&&Attestation) -> bool + Send, - curr_epoch_validity_filter: impl FnMut(&&Attestation) -> bool + Send, + prev_epoch_validity_filter: impl for<'a> FnMut(&AttestationRef<'a, T>) -> bool + Send, + curr_epoch_validity_filter: impl for<'a> FnMut(&AttestationRef<'a, T>) -> bool + Send, spec: &ChainSpec, ) -> Result>, OpPoolError> { // Attestations for the current fork, which may be from the current or previous epoch. - let prev_epoch = state.previous_epoch(); - let current_epoch = state.current_epoch(); + let (prev_epoch_key, curr_epoch_key) = CheckpointKey::keys_for_state(state); let all_attestations = self.attestations.read(); let total_active_balance = state .get_total_active_balance() .map_err(OpPoolError::GetAttestationsTotalBalanceError)?; + // Update the reward cache. + let reward_timer = metrics::start_timer(&metrics::BUILD_REWARD_CACHE_TIME); + let mut reward_cache = self.reward_cache.write(); + reward_cache.update(state)?; + let reward_cache = RwLockWriteGuard::downgrade(reward_cache); + drop(reward_timer); + // Split attestations for the previous & current epochs, so that we // can optimise them individually in parallel. let mut num_prev_valid = 0_i64; @@ -295,9 +264,10 @@ impl OperationPool { let prev_epoch_att = self .get_valid_attestations_for_epoch( - prev_epoch, + &prev_epoch_key, &*all_attestations, state, + &*reward_cache, total_active_balance, prev_epoch_validity_filter, spec, @@ -305,9 +275,10 @@ impl OperationPool { .inspect(|_| num_prev_valid += 1); let curr_epoch_att = self .get_valid_attestations_for_epoch( - current_epoch, + &curr_epoch_key, &*all_attestations, state, + &*reward_cache, total_active_balance, curr_epoch_validity_filter, spec, @@ -328,7 +299,7 @@ impl OperationPool { move || { let _timer = metrics::start_timer(&metrics::ATTESTATION_PREV_EPOCH_PACKING_TIME); // If we're in the genesis epoch, just use the current epoch attestations. - if prev_epoch == current_epoch { + if prev_epoch_key == curr_epoch_key { vec![] } else { maximum_cover(prev_epoch_att, prev_epoch_limit, "prev_epoch_attestations") @@ -356,36 +327,26 @@ impl OperationPool { /// Remove attestations which are too old to be included in a block. pub fn prune_attestations(&self, current_epoch: Epoch) { - // Prune attestations that are from before the previous epoch. - self.attestations.write().retain(|_, attestations| { - // All the attestations in this bucket have the same data, so we only need to - // check the first one. - attestations - .first() - .map_or(false, |att| current_epoch <= att.data.target.epoch + 1) - }); + self.attestations.write().prune(current_epoch); } /// Insert a proposer slashing into the pool. pub fn insert_proposer_slashing( &self, - verified_proposer_slashing: SigVerifiedOp, + verified_proposer_slashing: SigVerifiedOp, ) { - let slashing = verified_proposer_slashing.into_inner(); - self.proposer_slashings - .write() - .insert(slashing.signed_header_1.message.proposer_index, slashing); + self.proposer_slashings.write().insert( + verified_proposer_slashing.as_inner().proposer_index(), + verified_proposer_slashing, + ); } /// Insert an attester slashing into the pool. pub fn insert_attester_slashing( &self, - verified_slashing: SigVerifiedOp>, - fork: Fork, + verified_slashing: SigVerifiedOp, T>, ) { - self.attester_slashings - .write() - .insert((verified_slashing.into_inner(), fork.current_version)); + self.attester_slashings.write().insert(verified_slashing); } /// Get proposer and attester slashings for inclusion in a block. @@ -405,11 +366,13 @@ impl OperationPool { let proposer_slashings = filter_limit_operations( self.proposer_slashings.read().values(), |slashing| { - state - .validators() - .get(slashing.signed_header_1.message.proposer_index as usize) - .map_or(false, |validator| !validator.slashed) + slashing.signature_is_still_valid(&state.fork()) + && state + .validators() + .get(slashing.as_inner().signed_header_1.message.proposer_index as usize) + .map_or(false, |validator| !validator.slashed) }, + |slashing| slashing.as_inner().clone(), T::MaxProposerSlashings::to_usize(), ); @@ -417,30 +380,10 @@ impl OperationPool { // slashings. let mut to_be_slashed = proposer_slashings .iter() - .map(|s| s.signed_header_1.message.proposer_index) - .collect::>(); + .map(|s| s.proposer_index()) + .collect(); - let reader = self.attester_slashings.read(); - - let relevant_attester_slashings = reader.iter().flat_map(|(slashing, fork)| { - if *fork == state.fork().previous_version || *fork == state.fork().current_version { - AttesterSlashingMaxCover::new(slashing, &to_be_slashed, state) - } else { - None - } - }); - - let attester_slashings = maximum_cover( - relevant_attester_slashings, - T::MaxAttesterSlashings::to_usize(), - "attester_slashings", - ) - .into_iter() - .map(|cover| { - to_be_slashed.extend(cover.covering_set().keys()); - cover.object().clone() - }) - .collect(); + let attester_slashings = self.get_attester_slashings(state, &mut to_be_slashed); let voluntary_exits = self.get_voluntary_exits( state, @@ -451,6 +394,37 @@ impl OperationPool { (proposer_slashings, attester_slashings, voluntary_exits) } + /// Get attester slashings taking into account already slashed validators. + /// + /// This function *must* remain private. + fn get_attester_slashings( + &self, + state: &BeaconState, + to_be_slashed: &mut HashSet, + ) -> Vec> { + let reader = self.attester_slashings.read(); + + let relevant_attester_slashings = reader.iter().flat_map(|slashing| { + if slashing.signature_is_still_valid(&state.fork()) { + AttesterSlashingMaxCover::new(slashing.as_inner(), to_be_slashed, state) + } else { + None + } + }); + + maximum_cover( + relevant_attester_slashings, + T::MaxAttesterSlashings::to_usize(), + "attester_slashings", + ) + .into_iter() + .map(|cover| { + to_be_slashed.extend(cover.covering_set().keys()); + cover.intermediate().clone() + }) + .collect() + } + /// Prune proposer slashings for validators which are exited in the finalized epoch. pub fn prune_proposer_slashings(&self, head_state: &BeaconState) { prune_validator_hash_map( @@ -463,30 +437,23 @@ impl OperationPool { /// Prune attester slashings for all slashed or withdrawn validators, or attestations on another /// fork. pub fn prune_attester_slashings(&self, head_state: &BeaconState) { - self.attester_slashings - .write() - .retain(|(slashing, fork_version)| { - let previous_fork_is_finalized = - head_state.finalized_checkpoint().epoch >= head_state.fork().epoch; - // Prune any slashings which don't match the current fork version, or the previous - // fork version if it is not finalized yet. - let fork_ok = (*fork_version == head_state.fork().current_version) - || (*fork_version == head_state.fork().previous_version - && !previous_fork_is_finalized); - // Slashings that don't slash any validators can also be dropped. - let slashing_ok = - get_slashable_indices_modular(head_state, slashing, |_, validator| { - // Declare that a validator is still slashable if they have not exited prior - // to the finalized epoch. - // - // We cannot check the `slashed` field since the `head` is not finalized and - // a fork could un-slash someone. - validator.exit_epoch > head_state.finalized_checkpoint().epoch - }) - .map_or(false, |indices| !indices.is_empty()); + self.attester_slashings.write().retain(|slashing| { + // Check that the attestation's signature is still valid wrt the fork version. + let signature_ok = slashing.signature_is_still_valid(&head_state.fork()); + // Slashings that don't slash any validators can also be dropped. + let slashing_ok = + get_slashable_indices_modular(head_state, slashing.as_inner(), |_, validator| { + // Declare that a validator is still slashable if they have not exited prior + // to the finalized epoch. + // + // We cannot check the `slashed` field since the `head` is not finalized and + // a fork could un-slash someone. + validator.exit_epoch > head_state.finalized_checkpoint().epoch + }) + .map_or(false, |indices| !indices.is_empty()); - fork_ok && slashing_ok - }); + signature_ok && slashing_ok + }); } /// Total number of attester slashings in the pool. @@ -500,11 +467,10 @@ impl OperationPool { } /// Insert a voluntary exit that has previously been checked elsewhere. - pub fn insert_voluntary_exit(&self, verified_exit: SigVerifiedOp) { - let exit = verified_exit.into_inner(); + pub fn insert_voluntary_exit(&self, exit: SigVerifiedOp) { self.voluntary_exits .write() - .insert(exit.message.validator_index, exit); + .insert(exit.as_inner().message.validator_index, exit); } /// Get a list of voluntary exits for inclusion in a block. @@ -519,7 +485,12 @@ impl OperationPool { { filter_limit_operations( self.voluntary_exits.read().values(), - |exit| filter(exit) && verify_exit(state, exit, VerifySignatures::False, spec).is_ok(), + |exit| { + filter(exit.as_inner()) + && exit.signature_is_still_valid(&state.fork()) + && verify_exit(state, exit.as_inner(), VerifySignatures::False, spec).is_ok() + }, + |exit| exit.as_inner().clone(), T::MaxVoluntaryExits::to_usize(), ) } @@ -558,8 +529,8 @@ impl OperationPool { pub fn get_all_attestations(&self) -> Vec> { self.attestations .read() - .values() - .flat_map(|attns| attns.iter().cloned()) + .iter() + .map(|att| att.clone_as_attestation()) .collect() } @@ -568,14 +539,13 @@ impl OperationPool { /// This method may return objects that are invalid for block inclusion. pub fn get_filtered_attestations(&self, filter: F) -> Vec> where - F: Fn(&Attestation) -> bool, + F: Fn(&AttestationData) -> bool, { self.attestations .read() - .values() - .flat_map(|attns| attns.iter()) - .filter(|attn| filter(*attn)) - .cloned() + .iter() + .filter(|att| filter(&att.attestation_data())) + .map(|att| att.clone_as_attestation()) .collect() } @@ -586,7 +556,7 @@ impl OperationPool { self.attester_slashings .read() .iter() - .map(|(slashing, _)| slashing.clone()) + .map(|slashing| slashing.as_inner().clone()) .collect() } @@ -597,7 +567,7 @@ impl OperationPool { self.proposer_slashings .read() .iter() - .map(|(_, slashing)| slashing.clone()) + .map(|(_, slashing)| slashing.as_inner().clone()) .collect() } @@ -608,23 +578,29 @@ impl OperationPool { self.voluntary_exits .read() .iter() - .map(|(_, exit)| exit.clone()) + .map(|(_, exit)| exit.as_inner().clone()) .collect() } } /// Filter up to a maximum number of operations out of an iterator. -fn filter_limit_operations<'a, T: 'a, I, F>(operations: I, filter: F, limit: usize) -> Vec +fn filter_limit_operations<'a, T: 'a, V: 'a, I, F, G>( + operations: I, + filter: F, + mapping: G, + limit: usize, +) -> Vec where I: IntoIterator, F: Fn(&T) -> bool, + G: Fn(&T) -> V, T: Clone, { operations .into_iter() .filter(|x| filter(*x)) .take(limit) - .cloned() + .map(mapping) .collect() } @@ -634,17 +610,19 @@ where /// in the state's validator registry and then passed to `prune_if`. /// Entries for unknown validators will be kept. fn prune_validator_hash_map( - map: &mut HashMap, + map: &mut HashMap>, prune_if: F, head_state: &BeaconState, ) where F: Fn(&Validator) -> bool, + T: VerifyOperation, { - map.retain(|&validator_index, _| { - head_state - .validators() - .get(validator_index as usize) - .map_or(true, |validator| !prune_if(validator)) + map.retain(|&validator_index, op| { + op.signature_is_still_valid(&head_state.fork()) + && head_state + .validators() + .get(validator_index as usize) + .map_or(true, |validator| !prune_if(validator)) }); } @@ -655,6 +633,7 @@ impl PartialEq for OperationPool { return true; } *self.attestations.read() == *other.attestations.read() + && *self.sync_contributions.read() == *other.sync_contributions.read() && *self.attester_slashings.read() == *other.attester_slashings.read() && *self.proposer_slashings.read() == *other.proposer_slashings.read() && *self.voluntary_exits.read() == *other.voluntary_exits.read() @@ -669,7 +648,8 @@ mod release_tests { test_spec, BeaconChainHarness, EphemeralHarnessType, RelativeSyncCommittee, }; use lazy_static::lazy_static; - use state_processing::VerifyOperation; + use maplit::hashset; + use state_processing::{common::get_attesting_indices_from_state, VerifyOperation}; use std::collections::BTreeSet; use types::consts::altair::SYNC_COMMITTEE_SUBNET_COUNT; use types::*; @@ -689,6 +669,7 @@ mod release_tests { .spec_or_default(spec) .keypairs(KEYPAIRS[0..validator_count].to_vec()) .fresh_ephemeral_store() + .mock_execution_layer() .build(); harness.advance_slot(); @@ -714,7 +695,6 @@ mod release_tests { num_committees: usize, ) -> (BeaconChainHarness>, ChainSpec) { let mut spec = E::default_spec(); - spec.altair_fork_epoch = Some(Epoch::new(0)); let num_validators = @@ -780,10 +760,19 @@ mod release_tests { }) .unwrap(); + let att1_indices = get_attesting_indices_from_state(&state, &att1).unwrap(); + let att2_indices = get_attesting_indices_from_state(&state, &att2).unwrap(); + let att1_split = SplitAttestation::new(att1.clone(), att1_indices); + let att2_split = SplitAttestation::new(att2.clone(), att2_indices); + assert_eq!( att1.aggregation_bits.num_set_bits(), - earliest_attestation_validators(&att1, &state, state.as_base().unwrap()) - .num_set_bits() + earliest_attestation_validators( + &att1_split.as_ref(), + &state, + state.as_base().unwrap() + ) + .num_set_bits() ); state @@ -800,8 +789,12 @@ mod release_tests { assert_eq!( committees.get(0).unwrap().committee.len() - 2, - earliest_attestation_validators(&att2, &state, state.as_base().unwrap()) - .num_set_bits() + earliest_attestation_validators( + &att2_split.as_ref(), + &state, + state.as_base().unwrap() + ) + .num_set_bits() ); } } @@ -840,14 +833,12 @@ mod release_tests { ); for (atts, _) in attestations { - for att in atts.into_iter() { - op_pool - .insert_attestation(att.0, &state.fork(), state.genesis_validators_root(), spec) - .unwrap(); + for (att, _) in atts { + let attesting_indices = get_attesting_indices_from_state(&state, &att).unwrap(); + op_pool.insert_attestation(att, attesting_indices).unwrap(); } } - assert_eq!(op_pool.attestations.read().len(), committees.len()); assert_eq!(op_pool.num_attestations(), committees.len()); // Before the min attestation inclusion delay, get_attestations shouldn't return anything. @@ -913,17 +904,11 @@ mod release_tests { for (_, aggregate) in attestations { let att = aggregate.unwrap().message.aggregate; + let attesting_indices = get_attesting_indices_from_state(&state, &att).unwrap(); op_pool - .insert_attestation( - att.clone(), - &state.fork(), - state.genesis_validators_root(), - spec, - ) - .unwrap(); - op_pool - .insert_attestation(att, &state.fork(), state.genesis_validators_root(), spec) + .insert_attestation(att.clone(), attesting_indices.clone()) .unwrap(); + op_pool.insert_attestation(att, attesting_indices).unwrap(); } assert_eq!(op_pool.num_attestations(), committees.len()); @@ -1007,16 +992,17 @@ mod release_tests { .collect::>(); for att in aggs1.into_iter().chain(aggs2.into_iter()) { - op_pool - .insert_attestation(att, &state.fork(), state.genesis_validators_root(), spec) - .unwrap(); + let attesting_indices = get_attesting_indices_from_state(&state, &att).unwrap(); + op_pool.insert_attestation(att, attesting_indices).unwrap(); } } // The attestations should get aggregated into two attestations that comprise all // validators. - assert_eq!(op_pool.attestations.read().len(), committees.len()); - assert_eq!(op_pool.num_attestations(), 2 * committees.len()); + let stats = op_pool.attestation_stats(); + assert_eq!(stats.num_attestation_data, committees.len()); + assert_eq!(stats.num_attestations, 2 * committees.len()); + assert_eq!(stats.max_aggregates_per_data, 2); } /// Create a bunch of attestations signed by a small number of validators, and another @@ -1078,9 +1064,8 @@ mod release_tests { .collect::>(); for att in aggs { - op_pool - .insert_attestation(att, &state.fork(), state.genesis_validators_root(), spec) - .unwrap(); + let attesting_indices = get_attesting_indices_from_state(&state, &att).unwrap(); + op_pool.insert_attestation(att, attesting_indices).unwrap(); } }; @@ -1095,12 +1080,13 @@ mod release_tests { let num_small = target_committee_size / small_step_size; let num_big = target_committee_size / big_step_size; - assert_eq!(op_pool.attestations.read().len(), committees.len()); + let stats = op_pool.attestation_stats(); + assert_eq!(stats.num_attestation_data, committees.len()); assert_eq!( - op_pool.num_attestations(), + stats.num_attestations, (num_small + num_big) * committees.len() ); - assert!(op_pool.num_attestations() > max_attestations); + assert!(stats.num_attestations > max_attestations); *state.slot_mut() += spec.min_attestation_inclusion_delay; let best_attestations = op_pool @@ -1173,9 +1159,8 @@ mod release_tests { .collect::>(); for att in aggs { - op_pool - .insert_attestation(att, &state.fork(), state.genesis_validators_root(), spec) - .unwrap(); + let attesting_indices = get_attesting_indices_from_state(&state, &att).unwrap(); + op_pool.insert_attestation(att, attesting_indices).unwrap(); } }; @@ -1190,7 +1175,10 @@ mod release_tests { let num_small = target_committee_size / small_step_size; let num_big = target_committee_size / big_step_size; - assert_eq!(op_pool.attestations.read().len(), committees.len()); + assert_eq!( + op_pool.attestation_stats().num_attestation_data, + committees.len() + ); assert_eq!( op_pool.num_attestations(), (num_small + num_big) * committees.len() @@ -1210,11 +1198,21 @@ mod release_tests { // Used for asserting that rewards are in decreasing order. let mut prev_reward = u64::max_value(); - for att in &best_attestations { - let mut fresh_validators_rewards = - AttMaxCover::new(att, &state, total_active_balance, spec) - .unwrap() - .fresh_validators_rewards; + let mut reward_cache = RewardCache::default(); + reward_cache.update(&state).unwrap(); + + for att in best_attestations { + let attesting_indices = get_attesting_indices_from_state(&state, &att).unwrap(); + let split_attestation = SplitAttestation::new(att, attesting_indices); + let mut fresh_validators_rewards = AttMaxCover::new( + split_attestation.as_ref(), + &state, + &reward_cache, + total_active_balance, + spec, + ) + .unwrap() + .fresh_validators_rewards; // Remove validators covered by previous attestations. fresh_validators_rewards @@ -1281,10 +1279,7 @@ mod release_tests { let op_pool = OperationPool::::new(); let slashing = harness.make_attester_slashing(vec![1, 3, 5, 7, 9]); - op_pool.insert_attester_slashing( - slashing.clone().validate(&state, spec).unwrap(), - state.fork(), - ); + op_pool.insert_attester_slashing(slashing.clone().validate(&state, spec).unwrap()); op_pool.prune_attester_slashings(&state); assert_eq!( op_pool.get_slashings_and_exits(&state, &harness.spec).1, @@ -1305,22 +1300,10 @@ mod release_tests { let slashing_3 = harness.make_attester_slashing(vec![4, 5, 6]); let slashing_4 = harness.make_attester_slashing(vec![7, 8, 9, 10]); - op_pool.insert_attester_slashing( - slashing_1.clone().validate(&state, spec).unwrap(), - state.fork(), - ); - op_pool.insert_attester_slashing( - slashing_2.clone().validate(&state, spec).unwrap(), - state.fork(), - ); - op_pool.insert_attester_slashing( - slashing_3.clone().validate(&state, spec).unwrap(), - state.fork(), - ); - op_pool.insert_attester_slashing( - slashing_4.clone().validate(&state, spec).unwrap(), - state.fork(), - ); + op_pool.insert_attester_slashing(slashing_1.clone().validate(&state, spec).unwrap()); + op_pool.insert_attester_slashing(slashing_2.clone().validate(&state, spec).unwrap()); + op_pool.insert_attester_slashing(slashing_3.clone().validate(&state, spec).unwrap()); + op_pool.insert_attester_slashing(slashing_4.clone().validate(&state, spec).unwrap()); let best_slashings = op_pool.get_slashings_and_exits(&state, &harness.spec); assert_eq!(best_slashings.1, vec![slashing_4, slashing_3]); @@ -1339,22 +1322,10 @@ mod release_tests { let slashing_3 = harness.make_attester_slashing(vec![5, 6]); let slashing_4 = harness.make_attester_slashing(vec![6]); - op_pool.insert_attester_slashing( - slashing_1.clone().validate(&state, spec).unwrap(), - state.fork(), - ); - op_pool.insert_attester_slashing( - slashing_2.clone().validate(&state, spec).unwrap(), - state.fork(), - ); - op_pool.insert_attester_slashing( - slashing_3.clone().validate(&state, spec).unwrap(), - state.fork(), - ); - op_pool.insert_attester_slashing( - slashing_4.clone().validate(&state, spec).unwrap(), - state.fork(), - ); + op_pool.insert_attester_slashing(slashing_1.clone().validate(&state, spec).unwrap()); + op_pool.insert_attester_slashing(slashing_2.clone().validate(&state, spec).unwrap()); + op_pool.insert_attester_slashing(slashing_3.clone().validate(&state, spec).unwrap()); + op_pool.insert_attester_slashing(slashing_4.clone().validate(&state, spec).unwrap()); let best_slashings = op_pool.get_slashings_and_exits(&state, &harness.spec); assert_eq!(best_slashings.1, vec![slashing_1, slashing_3]); @@ -1374,18 +1345,9 @@ mod release_tests { let a_slashing_3 = harness.make_attester_slashing(vec![5, 6]); op_pool.insert_proposer_slashing(p_slashing.clone().validate(&state, spec).unwrap()); - op_pool.insert_attester_slashing( - a_slashing_1.clone().validate(&state, spec).unwrap(), - state.fork(), - ); - op_pool.insert_attester_slashing( - a_slashing_2.clone().validate(&state, spec).unwrap(), - state.fork(), - ); - op_pool.insert_attester_slashing( - a_slashing_3.clone().validate(&state, spec).unwrap(), - state.fork(), - ); + op_pool.insert_attester_slashing(a_slashing_1.clone().validate(&state, spec).unwrap()); + op_pool.insert_attester_slashing(a_slashing_2.clone().validate(&state, spec).unwrap()); + op_pool.insert_attester_slashing(a_slashing_3.clone().validate(&state, spec).unwrap()); let best_slashings = op_pool.get_slashings_and_exits(&state, &harness.spec); assert_eq!(best_slashings.1, vec![a_slashing_1, a_slashing_3]); @@ -1406,18 +1368,9 @@ mod release_tests { let slashing_2 = harness.make_attester_slashing(vec![5, 6]); let slashing_3 = harness.make_attester_slashing(vec![1, 2, 3]); - op_pool.insert_attester_slashing( - slashing_1.clone().validate(&state, spec).unwrap(), - state.fork(), - ); - op_pool.insert_attester_slashing( - slashing_2.clone().validate(&state, spec).unwrap(), - state.fork(), - ); - op_pool.insert_attester_slashing( - slashing_3.clone().validate(&state, spec).unwrap(), - state.fork(), - ); + op_pool.insert_attester_slashing(slashing_1.clone().validate(&state, spec).unwrap()); + op_pool.insert_attester_slashing(slashing_2.clone().validate(&state, spec).unwrap()); + op_pool.insert_attester_slashing(slashing_3.clone().validate(&state, spec).unwrap()); let best_slashings = op_pool.get_slashings_and_exits(&state, &harness.spec); assert_eq!(best_slashings.1, vec![slashing_1, slashing_3]); @@ -1438,18 +1391,9 @@ mod release_tests { let slashing_2 = harness.make_attester_slashing(vec![4, 5, 6]); let slashing_3 = harness.make_attester_slashing(vec![7, 8]); - op_pool.insert_attester_slashing( - slashing_1.clone().validate(&state, spec).unwrap(), - state.fork(), - ); - op_pool.insert_attester_slashing( - slashing_2.clone().validate(&state, spec).unwrap(), - state.fork(), - ); - op_pool.insert_attester_slashing( - slashing_3.clone().validate(&state, spec).unwrap(), - state.fork(), - ); + op_pool.insert_attester_slashing(slashing_1.clone().validate(&state, spec).unwrap()); + op_pool.insert_attester_slashing(slashing_2.clone().validate(&state, spec).unwrap()); + op_pool.insert_attester_slashing(slashing_3.clone().validate(&state, spec).unwrap()); let best_slashings = op_pool.get_slashings_and_exits(&state, &harness.spec); assert_eq!(best_slashings.1, vec![slashing_2, slashing_3]); @@ -1718,4 +1662,289 @@ mod release_tests { expected_bits ); } + + fn cross_fork_harness() -> (BeaconChainHarness>, ChainSpec) + { + let mut spec = test_spec::(); + + // Give some room to sign surround slashings. + spec.altair_fork_epoch = Some(Epoch::new(3)); + spec.bellatrix_fork_epoch = Some(Epoch::new(6)); + + // To make exits immediately valid. + spec.shard_committee_period = 0; + + let num_validators = 32; + + let harness = get_harness::(num_validators, Some(spec.clone())); + (harness, spec) + } + + /// Test several cross-fork voluntary exits: + /// + /// - phase0 exit (not valid after Bellatrix) + /// - phase0 exit signed with Altair fork version (only valid after Bellatrix) + #[tokio::test] + async fn cross_fork_exits() { + let (harness, spec) = cross_fork_harness::(); + let altair_fork_epoch = spec.altair_fork_epoch.unwrap(); + let bellatrix_fork_epoch = spec.bellatrix_fork_epoch.unwrap(); + let slots_per_epoch = MainnetEthSpec::slots_per_epoch(); + + let op_pool = OperationPool::::new(); + + // Sign an exit in phase0 with a phase0 epoch. + let exit1 = harness.make_voluntary_exit(0, Epoch::new(0)); + + // Advance to Altair. + harness + .extend_to_slot(altair_fork_epoch.start_slot(slots_per_epoch)) + .await; + let altair_head = harness.chain.canonical_head.cached_head().snapshot; + assert_eq!(altair_head.beacon_state.current_epoch(), altair_fork_epoch); + + // Add exit 1 to the op pool during Altair. It's still valid at this point and should be + // returned. + let verified_exit1 = exit1 + .clone() + .validate(&altair_head.beacon_state, &harness.chain.spec) + .unwrap(); + op_pool.insert_voluntary_exit(verified_exit1); + let exits = + op_pool.get_voluntary_exits(&altair_head.beacon_state, |_| true, &harness.chain.spec); + assert!(exits.contains(&exit1)); + assert_eq!(exits.len(), 1); + + // Advance to Bellatrix. + harness + .extend_to_slot(bellatrix_fork_epoch.start_slot(slots_per_epoch)) + .await; + let bellatrix_head = harness.chain.canonical_head.cached_head().snapshot; + assert_eq!( + bellatrix_head.beacon_state.current_epoch(), + bellatrix_fork_epoch + ); + + // Sign an exit with the Altair domain and a phase0 epoch. This is a weird type of exit + // that is valid because after the Bellatrix fork we'll use the Altair fork domain to verify + // all prior epochs. + let exit2 = harness.make_voluntary_exit(2, Epoch::new(0)); + let verified_exit2 = exit2 + .clone() + .validate(&bellatrix_head.beacon_state, &harness.chain.spec) + .unwrap(); + op_pool.insert_voluntary_exit(verified_exit2); + + // Attempting to fetch exit1 now should fail, despite it still being in the pool. + // exit2 should still be valid, because it was signed with the Altair fork domain. + assert_eq!(op_pool.voluntary_exits.read().len(), 2); + let exits = + op_pool.get_voluntary_exits(&bellatrix_head.beacon_state, |_| true, &harness.spec); + assert_eq!(&exits, &[exit2]); + } + + /// Test several cross-fork proposer slashings: + /// + /// - phase0 slashing (not valid after Bellatrix) + /// - Bellatrix signed with Altair fork version (not valid after Bellatrix) + /// - phase0 exit signed with Altair fork version (only valid after Bellatrix) + #[tokio::test] + async fn cross_fork_proposer_slashings() { + let (harness, spec) = cross_fork_harness::(); + let slots_per_epoch = MainnetEthSpec::slots_per_epoch(); + let altair_fork_epoch = spec.altair_fork_epoch.unwrap(); + let bellatrix_fork_epoch = spec.bellatrix_fork_epoch.unwrap(); + let bellatrix_fork_slot = bellatrix_fork_epoch.start_slot(slots_per_epoch); + + let op_pool = OperationPool::::new(); + + // Sign a proposer slashing in phase0 with a phase0 epoch. + let slashing1 = harness.make_proposer_slashing_at_slot(0, Some(Slot::new(1))); + + // Advance to Altair. + harness + .extend_to_slot(altair_fork_epoch.start_slot(slots_per_epoch)) + .await; + let altair_head = harness.chain.canonical_head.cached_head().snapshot; + assert_eq!(altair_head.beacon_state.current_epoch(), altair_fork_epoch); + + // Add slashing1 to the op pool during Altair. It's still valid at this point and should be + // returned. + let verified_slashing1 = slashing1 + .clone() + .validate(&altair_head.beacon_state, &harness.chain.spec) + .unwrap(); + op_pool.insert_proposer_slashing(verified_slashing1); + let (proposer_slashings, _, _) = + op_pool.get_slashings_and_exits(&altair_head.beacon_state, &harness.chain.spec); + assert!(proposer_slashings.contains(&slashing1)); + assert_eq!(proposer_slashings.len(), 1); + + // Sign a proposer slashing with a Bellatrix slot using the Altair fork domain. + // + // This slashing is valid only before the Bellatrix fork epoch. + let slashing2 = harness.make_proposer_slashing_at_slot(1, Some(bellatrix_fork_slot)); + let verified_slashing2 = slashing2 + .clone() + .validate(&altair_head.beacon_state, &harness.chain.spec) + .unwrap(); + op_pool.insert_proposer_slashing(verified_slashing2); + let (proposer_slashings, _, _) = + op_pool.get_slashings_and_exits(&altair_head.beacon_state, &harness.chain.spec); + assert!(proposer_slashings.contains(&slashing1)); + assert!(proposer_slashings.contains(&slashing2)); + assert_eq!(proposer_slashings.len(), 2); + + // Advance to Bellatrix. + harness.extend_to_slot(bellatrix_fork_slot).await; + let bellatrix_head = harness.chain.canonical_head.cached_head().snapshot; + assert_eq!( + bellatrix_head.beacon_state.current_epoch(), + bellatrix_fork_epoch + ); + + // Sign a proposer slashing with the Altair domain and a phase0 slot. This is a weird type + // of slashing that is only valid after the Bellatrix fork because we'll use the Altair fork + // domain to verify all prior epochs. + let slashing3 = harness.make_proposer_slashing_at_slot(2, Some(Slot::new(1))); + let verified_slashing3 = slashing3 + .clone() + .validate(&bellatrix_head.beacon_state, &harness.chain.spec) + .unwrap(); + op_pool.insert_proposer_slashing(verified_slashing3); + + // Attempting to fetch slashing1 now should fail, despite it still being in the pool. + // Likewise slashing2 is also invalid now because it should be signed with the + // Bellatrix fork version. + // slashing3 should still be valid, because it was signed with the Altair fork domain. + assert_eq!(op_pool.proposer_slashings.read().len(), 3); + let (proposer_slashings, _, _) = + op_pool.get_slashings_and_exits(&bellatrix_head.beacon_state, &harness.spec); + assert!(proposer_slashings.contains(&slashing3)); + assert_eq!(proposer_slashings.len(), 1); + } + + /// Test several cross-fork attester slashings: + /// + /// - both target epochs in phase0 (not valid after Bellatrix) + /// - both target epochs in Bellatrix but signed with Altair domain (not valid after Bellatrix) + /// - Altair attestation that surrounds a phase0 attestation (not valid after Bellatrix) + /// - both target epochs in phase0 but signed with Altair domain (only valid after Bellatrix) + #[tokio::test] + async fn cross_fork_attester_slashings() { + let (harness, spec) = cross_fork_harness::(); + let slots_per_epoch = MainnetEthSpec::slots_per_epoch(); + let zero_epoch = Epoch::new(0); + let altair_fork_epoch = spec.altair_fork_epoch.unwrap(); + let bellatrix_fork_epoch = spec.bellatrix_fork_epoch.unwrap(); + let bellatrix_fork_slot = bellatrix_fork_epoch.start_slot(slots_per_epoch); + + let op_pool = OperationPool::::new(); + + // Sign an attester slashing with the phase0 fork version, with both target epochs in phase0. + let slashing1 = harness.make_attester_slashing_with_epochs( + vec![0], + None, + Some(zero_epoch), + None, + Some(zero_epoch), + ); + + // Advance to Altair. + harness + .extend_to_slot(altair_fork_epoch.start_slot(slots_per_epoch)) + .await; + let altair_head = harness.chain.canonical_head.cached_head().snapshot; + assert_eq!(altair_head.beacon_state.current_epoch(), altair_fork_epoch); + + // Add slashing1 to the op pool during Altair. It's still valid at this point and should be + // returned. + let verified_slashing1 = slashing1 + .clone() + .validate(&altair_head.beacon_state, &harness.chain.spec) + .unwrap(); + op_pool.insert_attester_slashing(verified_slashing1); + + // Sign an attester slashing with two Bellatrix epochs using the Altair fork domain. + // + // This slashing is valid only before the Bellatrix fork epoch. + let slashing2 = harness.make_attester_slashing_with_epochs( + vec![1], + None, + Some(bellatrix_fork_epoch), + None, + Some(bellatrix_fork_epoch), + ); + let verified_slashing2 = slashing2 + .clone() + .validate(&altair_head.beacon_state, &harness.chain.spec) + .unwrap(); + op_pool.insert_attester_slashing(verified_slashing2); + let (_, attester_slashings, _) = + op_pool.get_slashings_and_exits(&altair_head.beacon_state, &harness.chain.spec); + assert!(attester_slashings.contains(&slashing1)); + assert!(attester_slashings.contains(&slashing2)); + assert_eq!(attester_slashings.len(), 2); + + // Sign an attester slashing where an Altair attestation surrounds a phase0 one. + // + // This slashing is valid only before the Bellatrix fork epoch. + let slashing3 = harness.make_attester_slashing_with_epochs( + vec![2], + Some(Epoch::new(0)), + Some(altair_fork_epoch), + Some(Epoch::new(1)), + Some(altair_fork_epoch - 1), + ); + let verified_slashing3 = slashing3 + .clone() + .validate(&altair_head.beacon_state, &harness.chain.spec) + .unwrap(); + op_pool.insert_attester_slashing(verified_slashing3); + + // All three slashings should be valid and returned from the pool at this point. + // Seeing as we can only extract 2 at time we'll just pretend that validator 0 is already + // slashed. + let mut to_be_slashed = hashset! {0}; + let attester_slashings = + op_pool.get_attester_slashings(&altair_head.beacon_state, &mut to_be_slashed); + assert!(attester_slashings.contains(&slashing2)); + assert!(attester_slashings.contains(&slashing3)); + assert_eq!(attester_slashings.len(), 2); + + // Advance to Bellatrix. + harness.extend_to_slot(bellatrix_fork_slot).await; + let bellatrix_head = harness.chain.canonical_head.cached_head().snapshot; + assert_eq!( + bellatrix_head.beacon_state.current_epoch(), + bellatrix_fork_epoch + ); + + // Sign an attester slashing with the Altair domain and phase0 epochs. This is a weird type + // of slashing that is only valid after the Bellatrix fork because we'll use the Altair fork + // domain to verify all prior epochs. + let slashing4 = harness.make_attester_slashing_with_epochs( + vec![3], + Some(Epoch::new(0)), + Some(altair_fork_epoch - 1), + Some(Epoch::new(0)), + Some(altair_fork_epoch - 1), + ); + let verified_slashing4 = slashing4 + .clone() + .validate(&bellatrix_head.beacon_state, &harness.chain.spec) + .unwrap(); + op_pool.insert_attester_slashing(verified_slashing4); + + // All slashings except slashing4 are now invalid (despite being present in the pool). + assert_eq!(op_pool.attester_slashings.read().len(), 4); + let (_, attester_slashings, _) = + op_pool.get_slashings_and_exits(&bellatrix_head.beacon_state, &harness.spec); + assert!(attester_slashings.contains(&slashing4)); + assert_eq!(attester_slashings.len(), 1); + + // Pruning the attester slashings should remove all but slashing4. + op_pool.prune_attester_slashings(&bellatrix_head.beacon_state); + assert_eq!(op_pool.attester_slashings.read().len(), 1); + } } diff --git a/beacon_node/operation_pool/src/max_cover.rs b/beacon_node/operation_pool/src/max_cover.rs index 8e50b8152e..2e629f786b 100644 --- a/beacon_node/operation_pool/src/max_cover.rs +++ b/beacon_node/operation_pool/src/max_cover.rs @@ -11,16 +11,21 @@ use itertools::Itertools; pub trait MaxCover: Clone { /// The result type, of which we would eventually like a collection of maximal quality. type Object: Clone; + /// The intermediate object type, which can be converted to `Object`. + type Intermediate: Clone; /// The type used to represent sets. type Set: Clone; - /// Extract an object for inclusion in a solution. - fn object(&self) -> &Self::Object; + /// Extract the intermediate object. + fn intermediate(&self) -> &Self::Intermediate; + + /// Convert the borrowed intermediate object to an owned object for the solution. + fn convert_to_object(intermediate: &Self::Intermediate) -> Self::Object; /// Get the set of elements covered. fn covering_set(&self) -> &Self::Set; /// Update the set of items covered, for the inclusion of some object in the solution. - fn update_covering_set(&mut self, max_obj: &Self::Object, max_set: &Self::Set); + fn update_covering_set(&mut self, max_obj: &Self::Intermediate, max_set: &Self::Set); /// The quality of this item's covering set, usually its cardinality. fn score(&self) -> usize; } @@ -86,7 +91,7 @@ where .filter(|x| x.available && x.item.score() != 0) .for_each(|x| { x.item - .update_covering_set(best.object(), best.covering_set()) + .update_covering_set(best.intermediate(), best.covering_set()) }); result.push(best); @@ -106,7 +111,7 @@ where .into_iter() .merge_by(cover2, |item1, item2| item1.score() >= item2.score()) .take(limit) - .map(|item| item.object().clone()) + .map(|item| T::convert_to_object(item.intermediate())) .collect() } @@ -121,12 +126,17 @@ mod test { T: Clone + Eq + Hash, { type Object = Self; + type Intermediate = Self; type Set = Self; - fn object(&self) -> &Self { + fn intermediate(&self) -> &Self { self } + fn convert_to_object(set: &Self) -> Self { + set.clone() + } + fn covering_set(&self) -> &Self { self } diff --git a/beacon_node/operation_pool/src/metrics.rs b/beacon_node/operation_pool/src/metrics.rs index 3fa5208a3d..6fd8567cef 100644 --- a/beacon_node/operation_pool/src/metrics.rs +++ b/beacon_node/operation_pool/src/metrics.rs @@ -3,6 +3,10 @@ use lazy_static::lazy_static; pub use lighthouse_metrics::*; lazy_static! { + pub static ref BUILD_REWARD_CACHE_TIME: Result = try_create_histogram( + "op_pool_build_reward_cache_time", + "Time to build the reward cache before packing attestations" + ); pub static ref ATTESTATION_PREV_EPOCH_PACKING_TIME: Result = try_create_histogram( "op_pool_attestation_prev_epoch_packing_time", "Time to pack previous epoch attestations" diff --git a/beacon_node/operation_pool/src/persistence.rs b/beacon_node/operation_pool/src/persistence.rs index 0769786097..ed15369df7 100644 --- a/beacon_node/operation_pool/src/persistence.rs +++ b/beacon_node/operation_pool/src/persistence.rs @@ -1,12 +1,13 @@ use crate::attestation_id::AttestationId; +use crate::attestation_storage::AttestationMap; use crate::sync_aggregate_id::SyncAggregateId; use crate::OpPoolError; use crate::OperationPool; use derivative::Derivative; use parking_lot::RwLock; -use serde_derive::{Deserialize, Serialize}; use ssz::{Decode, Encode}; use ssz_derive::{Decode, Encode}; +use state_processing::SigVerifiedOp; use store::{DBColumn, Error as StoreError, StoreItem}; use types::*; @@ -17,32 +18,42 @@ type PersistedSyncContributions = Vec<(SyncAggregateId, Vec { - /// Mapping from attestation ID to attestation mappings. - // We could save space by not storing the attestation ID, but it might - // be difficult to make that roundtrip due to eager aggregation. - attestations: Vec<(AttestationId, Vec>)>, + /// [DEPRECATED] Mapping from attestation ID to attestation mappings. + #[superstruct(only(V5))] + pub attestations_v5: Vec<(AttestationId, Vec>)>, + /// Attestations and their attesting indices. + #[superstruct(only(V12))] + pub attestations: Vec<(Attestation, Vec)>, /// Mapping from sync contribution ID to sync contributions and aggregate. - #[superstruct(only(Altair))] - sync_contributions: PersistedSyncContributions, + pub sync_contributions: PersistedSyncContributions, + /// [DEPRECATED] Attester slashings. + #[superstruct(only(V5))] + pub attester_slashings_v5: Vec<(AttesterSlashing, ForkVersion)>, /// Attester slashings. - attester_slashings: Vec<(AttesterSlashing, ForkVersion)>, - /// Proposer slashings. - proposer_slashings: Vec, - /// Voluntary exits. - voluntary_exits: Vec, + #[superstruct(only(V12))] + pub attester_slashings: Vec, T>>, + /// [DEPRECATED] Proposer slashings. + #[superstruct(only(V5))] + pub proposer_slashings_v5: Vec, + /// Proposer slashings with fork information. + #[superstruct(only(V12))] + pub proposer_slashings: Vec>, + /// [DEPRECATED] Voluntary exits. + #[superstruct(only(V5))] + pub voluntary_exits_v5: Vec, + /// Voluntary exits with fork information. + #[superstruct(only(V12))] + pub voluntary_exits: Vec>, } impl PersistedOperationPool { @@ -52,7 +63,12 @@ impl PersistedOperationPool { .attestations .read() .iter() - .map(|(att_id, att)| (att_id.clone(), att.clone())) + .map(|att| { + ( + att.clone_as_attestation(), + att.indexed.attesting_indices.clone(), + ) + }) .collect(); let sync_contributions = operation_pool @@ -83,7 +99,7 @@ impl PersistedOperationPool { .map(|(_, exit)| exit.clone()) .collect(); - PersistedOperationPool::Altair(PersistedOperationPoolAltair { + PersistedOperationPool::V12(PersistedOperationPoolV12 { attestations, sync_contributions, attester_slashings, @@ -92,45 +108,62 @@ impl PersistedOperationPool { }) } - /// Reconstruct an `OperationPool`. Sets `sync_contributions` to its `Default` if `self` matches - /// `PersistedOperationPool::Base`. + /// Reconstruct an `OperationPool`. pub fn into_operation_pool(self) -> Result, OpPoolError> { - let attestations = RwLock::new(self.attestations().iter().cloned().collect()); - let attester_slashings = RwLock::new(self.attester_slashings().iter().cloned().collect()); + let attester_slashings = RwLock::new(self.attester_slashings()?.iter().cloned().collect()); let proposer_slashings = RwLock::new( - self.proposer_slashings() + self.proposer_slashings()? .iter() .cloned() - .map(|slashing| (slashing.signed_header_1.message.proposer_index, slashing)) + .map(|slashing| (slashing.as_inner().proposer_index(), slashing)) .collect(), ); let voluntary_exits = RwLock::new( - self.voluntary_exits() + self.voluntary_exits()? .iter() .cloned() - .map(|exit| (exit.message.validator_index, exit)) + .map(|exit| (exit.as_inner().message.validator_index, exit)) .collect(), ); - let op_pool = match self { - PersistedOperationPool::Altair(_) => { - let sync_contributions = - RwLock::new(self.sync_contributions()?.iter().cloned().collect()); - - OperationPool { - attestations, - sync_contributions, - attester_slashings, - proposer_slashings, - voluntary_exits, - _phantom: Default::default(), + let sync_contributions = RwLock::new(self.sync_contributions().iter().cloned().collect()); + let attestations = match self { + PersistedOperationPool::V5(_) => return Err(OpPoolError::IncorrectOpPoolVariant), + PersistedOperationPool::V12(pool) => { + let mut map = AttestationMap::default(); + for (att, attesting_indices) in pool.attestations { + map.insert(att, attesting_indices); } + RwLock::new(map) } }; + let op_pool = OperationPool { + attestations, + sync_contributions, + attester_slashings, + proposer_slashings, + voluntary_exits, + reward_cache: Default::default(), + _phantom: Default::default(), + }; Ok(op_pool) } } -/// Deserialization for `PersistedOperationPool` defaults to `PersistedOperationPool::Altair`. +impl StoreItem for PersistedOperationPoolV5 { + fn db_column() -> DBColumn { + DBColumn::OpPool + } + + fn as_store_bytes(&self) -> Vec { + self.as_ssz_bytes() + } + + fn from_store_bytes(bytes: &[u8]) -> Result { + PersistedOperationPoolV5::from_ssz_bytes(bytes).map_err(Into::into) + } +} + +/// Deserialization for `PersistedOperationPool` defaults to `PersistedOperationPool::V12`. impl StoreItem for PersistedOperationPool { fn db_column() -> DBColumn { DBColumn::OpPool @@ -141,9 +174,9 @@ impl StoreItem for PersistedOperationPool { } fn from_store_bytes(bytes: &[u8]) -> Result { - // Default deserialization to the Altair variant. - PersistedOperationPoolAltair::from_ssz_bytes(bytes) - .map(Self::Altair) + // Default deserialization to the latest variant. + PersistedOperationPoolV12::from_ssz_bytes(bytes) + .map(Self::V12) .map_err(Into::into) } } diff --git a/beacon_node/operation_pool/src/reward_cache.rs b/beacon_node/operation_pool/src/reward_cache.rs new file mode 100644 index 0000000000..5b9d4258e9 --- /dev/null +++ b/beacon_node/operation_pool/src/reward_cache.rs @@ -0,0 +1,122 @@ +use crate::OpPoolError; +use bitvec::vec::BitVec; +use types::{BeaconState, BeaconStateError, Epoch, EthSpec, Hash256, ParticipationFlags}; + +#[derive(Debug, PartialEq, Eq, Clone)] +struct Initialization { + current_epoch: Epoch, + latest_block_root: Hash256, +} + +/// Cache to store pre-computed information for block proposal. +#[derive(Debug, Clone, Default)] +pub struct RewardCache { + initialization: Option, + /// `BitVec` of validator indices which don't have default participation flags for the prev. epoch. + /// + /// We choose to only track whether validators have *any* participation flag set because + /// it's impossible to include a new attestation which is better than the existing participation + /// UNLESS the validator makes a slashable attestation, and we assume that this is rare enough + /// that it's acceptable to be slightly sub-optimal in this case. + previous_epoch_participation: BitVec, + /// `BitVec` of validator indices which don't have default participation flags for the current epoch. + current_epoch_participation: BitVec, +} + +impl RewardCache { + pub fn has_attested_in_epoch( + &self, + validator_index: u64, + epoch: Epoch, + ) -> Result { + if let Some(init) = &self.initialization { + if init.current_epoch == epoch { + Ok(*self + .current_epoch_participation + .get(validator_index as usize) + .ok_or(OpPoolError::RewardCacheOutOfBounds)?) + } else if init.current_epoch == epoch + 1 { + Ok(*self + .previous_epoch_participation + .get(validator_index as usize) + .ok_or(OpPoolError::RewardCacheOutOfBounds)?) + } else { + Err(OpPoolError::RewardCacheWrongEpoch) + } + } else { + Err(OpPoolError::RewardCacheWrongEpoch) + } + } + + /// Return the root of the latest block applied to `state`. + /// + /// For simplicity at genesis we return the zero hash, which will cause one unnecessary + /// re-calculation in `update`. + fn latest_block_root(state: &BeaconState) -> Result { + if state.slot() == 0 { + Ok(Hash256::zero()) + } else { + Ok(*state + .get_block_root(state.slot() - 1) + .map_err(OpPoolError::RewardCacheGetBlockRoot)?) + } + } + + /// Update the cache. + pub fn update(&mut self, state: &BeaconState) -> Result<(), OpPoolError> { + if matches!(state, BeaconState::Base(_)) { + return Ok(()); + } + + let current_epoch = state.current_epoch(); + let latest_block_root = Self::latest_block_root(state)?; + + let new_init = Initialization { + current_epoch, + latest_block_root, + }; + + // The participation flags change every block, and will almost always need updating when + // this function is called at a new slot. + if self + .initialization + .as_ref() + .map_or(true, |init| *init != new_init) + { + self.update_previous_epoch_participation(state) + .map_err(OpPoolError::RewardCacheUpdatePrevEpoch)?; + self.update_current_epoch_participation(state) + .map_err(OpPoolError::RewardCacheUpdateCurrEpoch)?; + + self.initialization = Some(new_init); + } + + Ok(()) + } + + fn update_previous_epoch_participation( + &mut self, + state: &BeaconState, + ) -> Result<(), BeaconStateError> { + let default_participation = ParticipationFlags::default(); + self.previous_epoch_participation = state + .previous_epoch_participation()? + .iter() + .map(|participation| *participation != default_participation) + .collect(); + Ok(()) + } + + fn update_current_epoch_participation( + &mut self, + state: &BeaconState, + ) -> Result<(), BeaconStateError> { + let default_participation = ParticipationFlags::default(); + self.current_epoch_participation = state + .current_epoch_participation()? + .iter() + .map(|participation| *participation != default_participation) + .collect(); + Ok(()) + } +} diff --git a/beacon_node/src/cli.rs b/beacon_node/src/cli.rs index bbb904717b..3df95a0a5d 100644 --- a/beacon_node/src/cli.rs +++ b/beacon_node/src/cli.rs @@ -727,6 +727,16 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> { .default_value("250") .takes_value(true) ) + .arg( + Arg::with_name("paranoid-block-proposal") + .long("paranoid-block-proposal") + .help("Paranoid enough to be reading the source? Nice. This flag reverts some \ + block proposal optimisations and forces the node to check every attestation \ + it includes super thoroughly. This may be useful in an emergency, but not \ + otherwise.") + .hidden(true) + .takes_value(false) + ) .arg( Arg::with_name("builder-fallback-skips") .long("builder-fallback-skips") diff --git a/beacon_node/src/config.rs b/beacon_node/src/config.rs index e885275b04..f08981b103 100644 --- a/beacon_node/src/config.rs +++ b/beacon_node/src/config.rs @@ -644,6 +644,8 @@ pub fn get_config( client_config.chain.count_unrealized = clap_utils::parse_required(cli_args, "count-unrealized")?; + client_config.chain.paranoid_block_proposal = cli_args.is_present("paranoid-block-proposal"); + /* * Builder fallback configs. */ diff --git a/beacon_node/store/src/metadata.rs b/beacon_node/store/src/metadata.rs index d72dbcd23d..4f35c4c072 100644 --- a/beacon_node/store/src/metadata.rs +++ b/beacon_node/store/src/metadata.rs @@ -4,7 +4,7 @@ use ssz::{Decode, Encode}; use ssz_derive::{Decode, Encode}; use types::{Checkpoint, Hash256, Slot}; -pub const CURRENT_SCHEMA_VERSION: SchemaVersion = SchemaVersion(11); +pub const CURRENT_SCHEMA_VERSION: SchemaVersion = SchemaVersion(12); // All the keys that get stored under the `BeaconMeta` column. // diff --git a/book/src/database-migrations.md b/book/src/database-migrations.md index de7ced1331..c31e373b48 100644 --- a/book/src/database-migrations.md +++ b/book/src/database-migrations.md @@ -23,6 +23,7 @@ validator client or the slasher**. | v2.4.0 | Jul 2022 | v9 | yes (pre Bellatrix) | | v2.5.0 | Aug 2022 | v11 | yes | | v3.0.0 | Aug 2022 | v11 | yes | +| v3.1.0 | Sep 2022 | v12 | yes | > **Note**: All point releases (e.g. v2.3.1) are schema-compatible with the prior minor release > (e.g. v2.3.0). diff --git a/consensus/state_processing/Cargo.toml b/consensus/state_processing/Cargo.toml index c7ed4b308d..46ac2bae57 100644 --- a/consensus/state_processing/Cargo.toml +++ b/consensus/state_processing/Cargo.toml @@ -14,6 +14,7 @@ bls = { path = "../../crypto/bls" } integer-sqrt = "0.1.5" itertools = "0.10.0" eth2_ssz = "0.4.1" +eth2_ssz_derive = "0.3.0" eth2_ssz_types = "0.2.2" merkle_proof = { path = "../merkle_proof" } safe_arith = { path = "../safe_arith" } @@ -26,6 +27,7 @@ smallvec = "1.6.1" arbitrary = { version = "1.0", features = ["derive"], optional = true } lighthouse_metrics = { path = "../../common/lighthouse_metrics", optional = true } lazy_static = { version = "1.4.0", optional = true } +derivative = "2.1.1" [features] default = ["legacy-arith", "metrics"] diff --git a/consensus/state_processing/src/common/get_attesting_indices.rs b/consensus/state_processing/src/common/get_attesting_indices.rs index fb636f861e..d7d02c3601 100644 --- a/consensus/state_processing/src/common/get_attesting_indices.rs +++ b/consensus/state_processing/src/common/get_attesting_indices.rs @@ -1,12 +1,10 @@ use types::*; /// Returns validator indices which participated in the attestation, sorted by increasing index. -/// -/// Spec v0.12.1 pub fn get_attesting_indices( committee: &[usize], bitlist: &BitList, -) -> Result, BeaconStateError> { +) -> Result, BeaconStateError> { if bitlist.len() != committee.len() { return Err(BeaconStateError::InvalidBitfield); } @@ -15,7 +13,7 @@ pub fn get_attesting_indices( for (i, validator_index) in committee.iter().enumerate() { if let Ok(true) = bitlist.get(i) { - indices.push(*validator_index) + indices.push(*validator_index as u64) } } @@ -23,3 +21,12 @@ pub fn get_attesting_indices( Ok(indices) } + +/// Shortcut for getting the attesting indices while fetching the committee from the state's cache. +pub fn get_attesting_indices_from_state( + state: &BeaconState, + att: &Attestation, +) -> Result, BeaconStateError> { + let committee = state.get_beacon_committee(att.data.slot, att.data.index)?; + get_attesting_indices::(committee.committee, &att.aggregation_bits) +} diff --git a/consensus/state_processing/src/common/get_indexed_attestation.rs b/consensus/state_processing/src/common/get_indexed_attestation.rs index daa1c09307..63f63698e4 100644 --- a/consensus/state_processing/src/common/get_indexed_attestation.rs +++ b/consensus/state_processing/src/common/get_indexed_attestation.rs @@ -14,9 +14,7 @@ pub fn get_indexed_attestation( let attesting_indices = get_attesting_indices::(committee, &attestation.aggregation_bits)?; Ok(IndexedAttestation { - attesting_indices: VariableList::new( - attesting_indices.into_iter().map(|x| x as u64).collect(), - )?, + attesting_indices: VariableList::new(attesting_indices)?, data: attestation.data.clone(), signature: attestation.signature.clone(), }) diff --git a/consensus/state_processing/src/common/mod.rs b/consensus/state_processing/src/common/mod.rs index 334a293ed5..8a2e2439bb 100644 --- a/consensus/state_processing/src/common/mod.rs +++ b/consensus/state_processing/src/common/mod.rs @@ -10,7 +10,7 @@ pub mod base; pub use deposit_data_tree::DepositDataTree; pub use get_attestation_participation::get_attestation_participation_flag_indices; -pub use get_attesting_indices::get_attesting_indices; +pub use get_attesting_indices::{get_attesting_indices, get_attesting_indices_from_state}; pub use get_indexed_attestation::get_indexed_attestation; pub use initiate_validator_exit::initiate_validator_exit; pub use slash_validator::slash_validator; diff --git a/consensus/state_processing/src/per_epoch_processing/base/validator_statuses.rs b/consensus/state_processing/src/per_epoch_processing/base/validator_statuses.rs index b40f91ce5a..26d2536e5f 100644 --- a/consensus/state_processing/src/per_epoch_processing/base/validator_statuses.rs +++ b/consensus/state_processing/src/per_epoch_processing/base/validator_statuses.rs @@ -278,8 +278,8 @@ impl ValidatorStatuses { // Loop through the participating validator indices and update the status vec. for validator_index in attesting_indices { self.statuses - .get_mut(validator_index) - .ok_or(BeaconStateError::UnknownValidator(validator_index))? + .get_mut(validator_index as usize) + .ok_or(BeaconStateError::UnknownValidator(validator_index as usize))? .update(&status); } } diff --git a/consensus/state_processing/src/upgrade/altair.rs b/consensus/state_processing/src/upgrade/altair.rs index 5e4fcbcf55..176f1af15c 100644 --- a/consensus/state_processing/src/upgrade/altair.rs +++ b/consensus/state_processing/src/upgrade/altair.rs @@ -32,8 +32,8 @@ pub fn translate_participation( for index in attesting_indices { for flag_index in &participation_flag_indices { epoch_participation - .get_mut(index) - .ok_or(Error::UnknownValidator(index))? + .get_mut(index as usize) + .ok_or(Error::UnknownValidator(index as usize))? .add_flag(*flag_index)?; } } diff --git a/consensus/state_processing/src/verify_operation.rs b/consensus/state_processing/src/verify_operation.rs index 25c2839edd..80dee28f62 100644 --- a/consensus/state_processing/src/verify_operation.rs +++ b/consensus/state_processing/src/verify_operation.rs @@ -5,36 +5,120 @@ use crate::per_block_processing::{ verify_attester_slashing, verify_exit, verify_proposer_slashing, }; use crate::VerifySignatures; +use derivative::Derivative; +use smallvec::{smallvec, SmallVec}; +use ssz::{Decode, Encode}; +use ssz_derive::{Decode, Encode}; +use std::marker::PhantomData; use types::{ - AttesterSlashing, BeaconState, ChainSpec, EthSpec, ProposerSlashing, SignedVoluntaryExit, + AttesterSlashing, BeaconState, ChainSpec, Epoch, EthSpec, Fork, ForkVersion, ProposerSlashing, + SignedVoluntaryExit, }; +const MAX_FORKS_VERIFIED_AGAINST: usize = 2; + /// Wrapper around an operation type that acts as proof that its signature has been checked. /// -/// The inner field is private, meaning instances of this type can only be constructed +/// The inner `op` field is private, meaning instances of this type can only be constructed /// by calling `validate`. -#[derive(Debug, PartialEq, Eq, Clone)] -pub struct SigVerifiedOp(T); +#[derive(Derivative, Debug, Clone, Encode, Decode)] +#[derivative( + PartialEq, + Eq, + Hash(bound = "T: Encode + Decode + std::hash::Hash, E: EthSpec") +)] +pub struct SigVerifiedOp { + op: T, + verified_against: VerifiedAgainst, + #[ssz(skip_serializing, skip_deserializing)] + _phantom: PhantomData, +} + +/// Information about the fork versions that this message was verified against. +/// +/// In general it is not safe to assume that a `SigVerifiedOp` constructed at some point in the past +/// will continue to be valid in the presence of a changing `state.fork()`. The reason for this +/// is that the fork versions that the message's epochs map to might change. +/// +/// For example a proposer slashing at a phase0 slot verified against an Altair state will use +/// the phase0 fork version, but will become invalid once the Bellatrix fork occurs because that +/// slot will start to map to the Altair fork version. This is because `Fork::get_fork_version` only +/// remembers the most recent two forks. +/// +/// In the other direction, a proposer slashing at a Bellatrix slot verified against an Altair state +/// will use the Altair fork version, but will become invalid once the Bellatrix fork occurs because +/// that slot will start to map to the Bellatrix fork version. +/// +/// We need to store multiple `ForkVersion`s because attester slashings contain two indexed +/// attestations which may be signed using different versions. +#[derive(Debug, PartialEq, Eq, Clone, Hash, Encode, Decode)] +pub struct VerifiedAgainst { + fork_versions: SmallVec<[ForkVersion; MAX_FORKS_VERIFIED_AGAINST]>, +} + +impl SigVerifiedOp +where + T: VerifyOperation, + E: EthSpec, +{ + /// This function must be private because it assumes that `op` has already been verified. + fn new(op: T, state: &BeaconState) -> Self { + let verified_against = VerifiedAgainst { + fork_versions: op + .verification_epochs() + .into_iter() + .map(|epoch| state.fork().get_fork_version(epoch)) + .collect(), + }; + + SigVerifiedOp { + op, + verified_against, + _phantom: PhantomData, + } + } -impl SigVerifiedOp { pub fn into_inner(self) -> T { - self.0 + self.op } pub fn as_inner(&self) -> &T { - &self.0 + &self.op + } + + pub fn signature_is_still_valid(&self, current_fork: &Fork) -> bool { + self.as_inner() + .verification_epochs() + .into_iter() + .zip(self.verified_against.fork_versions.iter()) + .all(|(epoch, verified_fork_version)| { + current_fork.get_fork_version(epoch) == *verified_fork_version + }) + } + + /// Return one of the fork versions this message was verified against. + /// + /// This is only required for the v12 schema downgrade and can be deleted once all nodes + /// are upgraded to v12. + pub fn first_fork_verified_against(&self) -> Option { + self.verified_against.fork_versions.first().copied() } } /// Trait for operations that can be verified and transformed into a `SigVerifiedOp`. -pub trait VerifyOperation: Sized { +pub trait VerifyOperation: Encode + Decode + Sized { type Error; fn validate( self, state: &BeaconState, spec: &ChainSpec, - ) -> Result, Self::Error>; + ) -> Result, Self::Error>; + + /// Return the epochs at which parts of this message were verified. + /// + /// These need to map 1-to-1 to the `SigVerifiedOp::verified_against` for this type. + fn verification_epochs(&self) -> SmallVec<[Epoch; MAX_FORKS_VERIFIED_AGAINST]>; } impl VerifyOperation for SignedVoluntaryExit { @@ -44,9 +128,14 @@ impl VerifyOperation for SignedVoluntaryExit { self, state: &BeaconState, spec: &ChainSpec, - ) -> Result, Self::Error> { + ) -> Result, Self::Error> { verify_exit(state, &self, VerifySignatures::True, spec)?; - Ok(SigVerifiedOp(self)) + Ok(SigVerifiedOp::new(self, state)) + } + + #[allow(clippy::integer_arithmetic)] + fn verification_epochs(&self) -> SmallVec<[Epoch; MAX_FORKS_VERIFIED_AGAINST]> { + smallvec![self.message.epoch] } } @@ -57,9 +146,17 @@ impl VerifyOperation for AttesterSlashing { self, state: &BeaconState, spec: &ChainSpec, - ) -> Result, Self::Error> { + ) -> Result, Self::Error> { verify_attester_slashing(state, &self, VerifySignatures::True, spec)?; - Ok(SigVerifiedOp(self)) + Ok(SigVerifiedOp::new(self, state)) + } + + #[allow(clippy::integer_arithmetic)] + fn verification_epochs(&self) -> SmallVec<[Epoch; MAX_FORKS_VERIFIED_AGAINST]> { + smallvec![ + self.attestation_1.data.target.epoch, + self.attestation_2.data.target.epoch + ] } } @@ -70,8 +167,18 @@ impl VerifyOperation for ProposerSlashing { self, state: &BeaconState, spec: &ChainSpec, - ) -> Result, Self::Error> { + ) -> Result, Self::Error> { verify_proposer_slashing(&self, state, VerifySignatures::True, spec)?; - Ok(SigVerifiedOp(self)) + Ok(SigVerifiedOp::new(self, state)) + } + + #[allow(clippy::integer_arithmetic)] + fn verification_epochs(&self) -> SmallVec<[Epoch; MAX_FORKS_VERIFIED_AGAINST]> { + // Only need a single epoch because the slots of the two headers must be equal. + smallvec![self + .signed_header_1 + .message + .slot + .epoch(E::slots_per_epoch())] } } diff --git a/consensus/types/src/proposer_slashing.rs b/consensus/types/src/proposer_slashing.rs index ff12b0611a..ca048b149a 100644 --- a/consensus/types/src/proposer_slashing.rs +++ b/consensus/types/src/proposer_slashing.rs @@ -18,6 +18,13 @@ pub struct ProposerSlashing { pub signed_header_2: SignedBeaconBlockHeader, } +impl ProposerSlashing { + /// Get proposer index, assuming slashing validity has already been checked. + pub fn proposer_index(&self) -> u64 { + self.signed_header_1.message.proposer_index + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/lighthouse/tests/beacon_node.rs b/lighthouse/tests/beacon_node.rs index 7fd4ad91cf..14934a5669 100644 --- a/lighthouse/tests/beacon_node.rs +++ b/lighthouse/tests/beacon_node.rs @@ -132,6 +132,21 @@ fn fork_choice_before_proposal_timeout_zero() { .with_config(|config| assert_eq!(config.chain.fork_choice_before_proposal_timeout_ms, 0)); } +#[test] +fn paranoid_block_proposal_default() { + CommandLineTest::new() + .run_with_zero_port() + .with_config(|config| assert!(!config.chain.paranoid_block_proposal)); +} + +#[test] +fn paranoid_block_proposal_on() { + CommandLineTest::new() + .flag("paranoid-block-proposal", None) + .run_with_zero_port() + .with_config(|config| assert!(config.chain.paranoid_block_proposal)); +} + #[test] fn count_unrealized_default() { CommandLineTest::new()