From 84ae7b29768ca0dfc3efdb6317a334e747dd5eac Mon Sep 17 00:00:00 2001 From: Michael Sproul Date: Mon, 24 Oct 2022 13:43:52 +1100 Subject: [PATCH] Cache indexed attestations --- beacon_node/beacon_chain/src/beacon_chain.rs | 6 +- .../beacon_chain/src/block_verification.rs | 47 +++++++------ consensus/ssz_types/src/bitfield.rs | 6 +- .../state_processing/src/consensus_context.rs | 41 ++++++++++-- .../src/per_block_processing.rs | 5 +- .../block_signature_verifier.rs | 66 +++++++++---------- .../process_operations.rs | 25 +++++-- .../verify_attestation.rs | 19 +++--- lcli/src/transition_blocks.rs | 43 ++++++------ 9 files changed, 152 insertions(+), 106 deletions(-) diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 52f0440f0a..a8c0f6c24b 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -140,10 +140,6 @@ const MAX_PER_SLOT_FORK_CHOICE_DISTANCE: u64 = 256; pub const INVALID_JUSTIFIED_PAYLOAD_SHUTDOWN_REASON: &str = "Justified block has an invalid execution payload."; -// FIXME(sproul): decide whether to keep this -// Interval before the attestation deadline during which to consider blocks "borderline" late. -// const BORDERLINE_LATE_BLOCK_TOLERANCE: Duration = Duration::from_millis(50); - pub const INVALID_FINALIZED_MERGE_TRANSITION_BLOCK_SHUTDOWN_REASON: &str = "Finalized merge transition block is invalid."; @@ -3369,10 +3365,12 @@ impl BeaconChain { // This will be a lot slower but guards against bugs in block production and can be // quickly rolled out without a release. if self.config.paranoid_block_proposal { + let mut tmp_ctxt = ConsensusContext::new(state.slot()); attestations.retain(|att| { verify_attestation_for_block_inclusion( &state, att, + &mut tmp_ctxt, VerifySignatures::True, &self.spec, ) diff --git a/beacon_node/beacon_chain/src/block_verification.rs b/beacon_node/beacon_chain/src/block_verification.rs index 43daabf1ea..084b12411f 100644 --- a/beacon_node/beacon_chain/src/block_verification.rs +++ b/beacon_node/beacon_chain/src/block_verification.rs @@ -547,8 +547,22 @@ pub fn signature_verify_chain_segment( let pubkey_cache = get_validator_pubkey_cache(chain)?; let mut signature_verifier = get_signature_verifier::(&state, &pubkey_cache, &chain.spec); + let mut signature_verified_blocks = Vec::with_capacity(chain_segment.len()); + for (block_root, block) in &chain_segment { - signature_verifier.include_all_signatures(block, Some(*block_root), None)?; + let mut consensus_context = + ConsensusContext::new(block.slot()).set_current_block_root(*block_root); + + signature_verifier.include_all_signatures(&block, &mut consensus_context)?; + + // Save the block and its consensus context. The context will have had its proposer index + // and attesting indices filled in, which can be used to accelerate later block processing. + signature_verified_blocks.push(SignatureVerifiedBlock { + block: block.clone(), + block_root: *block_root, + parent: None, + consensus_context, + }); } if signature_verifier.verify().is_err() { @@ -557,22 +571,6 @@ pub fn signature_verify_chain_segment( drop(pubkey_cache); - let mut signature_verified_blocks = chain_segment - .into_iter() - .map(|(block_root, block)| { - // Proposer index has already been verified above during signature verification. - let consensus_context = ConsensusContext::new(block.slot()) - .set_current_block_root(block_root) - .set_proposer_index(block.message().proposer_index()); - SignatureVerifiedBlock { - block, - block_root, - parent: None, - consensus_context, - } - }) - .collect::>(); - if let Some(signature_verified_block) = signature_verified_blocks.first_mut() { signature_verified_block.parent = Some(parent); } @@ -941,13 +939,14 @@ impl SignatureVerifiedBlock { let mut signature_verifier = get_signature_verifier::(&state, &pubkey_cache, &chain.spec); - signature_verifier.include_all_signatures(&block, Some(block_root), None)?; + let mut consensus_context = + ConsensusContext::new(block.slot()).set_current_block_root(block_root); + + signature_verifier.include_all_signatures(&block, &mut consensus_context)?; if signature_verifier.verify().is_ok() { Ok(Self { - consensus_context: ConsensusContext::new(block.slot()) - .set_current_block_root(block_root) - .set_proposer_index(block.message().proposer_index()), + consensus_context, block, block_root, parent: Some(parent), @@ -993,16 +992,16 @@ impl SignatureVerifiedBlock { // Gossip verification has already checked the proposer index. Use it to check the RANDAO // signature. - let verified_proposer_index = Some(block.message().proposer_index()); + let mut consensus_context = from.consensus_context; signature_verifier - .include_all_signatures_except_proposal(&block, verified_proposer_index)?; + .include_all_signatures_except_proposal(&block, &mut consensus_context)?; if signature_verifier.verify().is_ok() { Ok(Self { block, block_root: from.block_root, parent: Some(parent), - consensus_context: from.consensus_context, + consensus_context, }) } else { Err(BlockError::InvalidSignature) diff --git a/consensus/ssz_types/src/bitfield.rs b/consensus/ssz_types/src/bitfield.rs index b0cf4551ee..0539cc7d2c 100644 --- a/consensus/ssz_types/src/bitfield.rs +++ b/consensus/ssz_types/src/bitfield.rs @@ -22,7 +22,7 @@ pub trait BitfieldBehaviour: Clone {} /// A marker struct used to declare SSZ `Variable` behaviour on a `Bitfield`. /// /// See the [`Bitfield`](struct.Bitfield.html) docs for usage. -#[derive(Clone, PartialEq, Debug)] +#[derive(Clone, PartialEq, Eq, Debug)] pub struct Variable { _phantom: PhantomData, } @@ -30,7 +30,7 @@ pub struct Variable { /// A marker struct used to declare SSZ `Fixed` behaviour on a `Bitfield`. /// /// See the [`Bitfield`](struct.Bitfield.html) docs for usage. -#[derive(Clone, PartialEq, Debug)] +#[derive(Clone, PartialEq, Eq, Debug)] pub struct Fixed { _phantom: PhantomData, } @@ -96,7 +96,7 @@ pub type BitVector = Bitfield>; /// byte (by `Vec` index) stores the lowest bit-indices and the right-most bit stores the lowest /// bit-index. E.g., `smallvec![0b0000_0001, 0b0000_0010]` has bits `0, 9` set. #[derive(Clone, Debug, Derivative)] -#[derivative(PartialEq, Hash(bound = ""))] +#[derivative(PartialEq, Eq, Hash(bound = ""))] pub struct Bitfield { bytes: SmallVec<[u8; SMALLVEC_LEN]>, len: usize, diff --git a/consensus/state_processing/src/consensus_context.rs b/consensus/state_processing/src/consensus_context.rs index 1e19587350..0766092d52 100644 --- a/consensus/state_processing/src/consensus_context.rs +++ b/consensus/state_processing/src/consensus_context.rs @@ -1,10 +1,13 @@ +use crate::common::get_indexed_attestation; +use crate::per_block_processing::errors::{AttestationInvalid, BlockOperationError}; use crate::{EpochCache, EpochCacheError}; use std::borrow::Cow; +use std::collections::{hash_map::Entry, HashMap}; use std::marker::PhantomData; use tree_hash::TreeHash; use types::{ - BeaconState, BeaconStateError, ChainSpec, EthSpec, ExecPayload, Hash256, SignedBeaconBlock, - Slot, + Attestation, AttestationData, BeaconState, BeaconStateError, BitList, ChainSpec, EthSpec, + ExecPayload, Hash256, IndexedAttestation, SignedBeaconBlock, Slot, }; #[derive(Debug, Clone)] @@ -17,6 +20,9 @@ pub struct ConsensusContext { current_block_root: Option, /// Epoch cache of values that are useful for block processing that are static over an epoch. epoch_cache: Option, + /// Cache of indexed attestations constructed during block processing. + indexed_attestations: + HashMap<(AttestationData, BitList), IndexedAttestation>, _phantom: PhantomData, } @@ -46,6 +52,7 @@ impl ConsensusContext { proposer_index: None, current_block_root: None, epoch_cache: None, + indexed_attestations: HashMap::new(), _phantom: PhantomData, } } @@ -107,9 +114,9 @@ impl ConsensusContext { self } - pub fn get_base_reward( + pub fn get_base_reward( &mut self, - state: &BeaconState, + state: &BeaconState, validator_index: usize, spec: &ChainSpec, ) -> Result { @@ -126,4 +133,30 @@ impl ConsensusContext { Ok(epoch_cache.get_base_reward(validator_index)?) } + + pub fn get_indexed_attestation( + &mut self, + state: &BeaconState, + attestation: &Attestation, + ) -> Result<&IndexedAttestation, BlockOperationError> { + let key = ( + attestation.data.clone(), + attestation.aggregation_bits.clone(), + ); + + match self.indexed_attestations.entry(key) { + Entry::Occupied(occupied) => Ok(occupied.into_mut()), + Entry::Vacant(vacant) => { + let committee = + state.get_beacon_committee(attestation.data.slot, attestation.data.index)?; + let indexed_attestation = + get_indexed_attestation(committee.committee, attestation)?; + Ok(vacant.insert(indexed_attestation)) + } + } + } + + pub fn num_cached_indexed_attestations(&self) -> usize { + self.indexed_attestations.len() + } } diff --git a/consensus/state_processing/src/per_block_processing.rs b/consensus/state_processing/src/per_block_processing.rs index ae659448b9..c2f84e2508 100644 --- a/consensus/state_processing/src/per_block_processing.rs +++ b/consensus/state_processing/src/per_block_processing.rs @@ -111,16 +111,13 @@ pub fn per_block_processing>( let verify_signatures = match block_signature_strategy { BlockSignatureStrategy::VerifyBulk => { // Verify all signatures in the block at once. - let block_root = Some(ctxt.get_current_block_root(signed_block)?); - let proposer_index = Some(ctxt.get_proposer_index(state, spec)?); block_verify!( BlockSignatureVerifier::verify_entire_block( state, |i| get_pubkey_from_state(state, i), |pk_bytes| pk_bytes.decompress().ok().map(Cow::Owned), signed_block, - block_root, - proposer_index, + ctxt, spec ) .is_ok(), diff --git a/consensus/state_processing/src/per_block_processing/block_signature_verifier.rs b/consensus/state_processing/src/per_block_processing/block_signature_verifier.rs index 7584df14ec..3387a3df77 100644 --- a/consensus/state_processing/src/per_block_processing/block_signature_verifier.rs +++ b/consensus/state_processing/src/per_block_processing/block_signature_verifier.rs @@ -1,14 +1,13 @@ #![allow(clippy::integer_arithmetic)] use super::signature_sets::{Error as SignatureSetError, *}; -use crate::common::get_indexed_attestation; use crate::per_block_processing::errors::{AttestationInvalid, BlockOperationError}; +use crate::{ConsensusContext, ContextError}; use bls::{verify_signature_sets, PublicKey, PublicKeyBytes, SignatureSet}; use rayon::prelude::*; use std::borrow::Cow; use types::{ - BeaconState, BeaconStateError, ChainSpec, EthSpec, ExecPayload, Hash256, IndexedAttestation, - SignedBeaconBlock, + BeaconState, BeaconStateError, ChainSpec, EthSpec, ExecPayload, Hash256, SignedBeaconBlock, }; pub type Result = std::result::Result; @@ -28,6 +27,8 @@ pub enum Error { IncorrectBlockProposer { block: u64, local_shuffling: u64 }, /// Failed to load a signature set. The block may be invalid or we failed to process it. SignatureSetError(SignatureSetError), + /// Error related to the consensus context, likely the proposer index or block root calc. + ContextError(ContextError), } impl From for Error { @@ -36,6 +37,12 @@ impl From for Error { } } +impl From for Error { + fn from(e: ContextError) -> Error { + Error::ContextError(e) + } +} + impl From for Error { fn from(e: SignatureSetError) -> Error { match e { @@ -122,12 +129,11 @@ where get_pubkey: F, decompressor: D, block: &'a SignedBeaconBlock, - block_root: Option, - verified_proposer_index: Option, + ctxt: &mut ConsensusContext, spec: &'a ChainSpec, ) -> Result<()> { let mut verifier = Self::new(state, get_pubkey, decompressor, spec); - verifier.include_all_signatures(block, block_root, verified_proposer_index)?; + verifier.include_all_signatures(block, ctxt)?; verifier.verify() } @@ -135,11 +141,13 @@ where pub fn include_all_signatures>( &mut self, block: &'a SignedBeaconBlock, - block_root: Option, - verified_proposer_index: Option, + ctxt: &mut ConsensusContext, ) -> Result<()> { + let block_root = Some(ctxt.get_current_block_root(block)?); + let verified_proposer_index = Some(ctxt.get_proposer_index(self.state, self.spec)?); + self.include_block_proposal(block, block_root, verified_proposer_index)?; - self.include_all_signatures_except_proposal(block, verified_proposer_index)?; + self.include_all_signatures_except_proposal(block, ctxt)?; Ok(()) } @@ -149,12 +157,13 @@ where pub fn include_all_signatures_except_proposal>( &mut self, block: &'a SignedBeaconBlock, - verified_proposer_index: Option, + ctxt: &mut ConsensusContext, ) -> Result<()> { + let verified_proposer_index = Some(ctxt.get_proposer_index(self.state, self.spec)?); self.include_randao_reveal(block, verified_proposer_index)?; self.include_proposer_slashings(block)?; self.include_attester_slashings(block)?; - self.include_attestations(block)?; + self.include_attestations(block, ctxt)?; // Deposits are not included because they can legally have invalid signatures. self.include_exits(block)?; self.include_sync_aggregate(block)?; @@ -260,7 +269,8 @@ where pub fn include_attestations>( &mut self, block: &'a SignedBeaconBlock, - ) -> Result>> { + ctxt: &mut ConsensusContext, + ) -> Result<()> { self.sets .sets .reserve(block.message().body().attestations().len()); @@ -270,28 +280,18 @@ where .body() .attestations() .iter() - .try_fold( - Vec::with_capacity(block.message().body().attestations().len()), - |mut vec, attestation| { - let committee = self - .state - .get_beacon_committee(attestation.data.slot, attestation.data.index)?; - let indexed_attestation = - get_indexed_attestation(committee.committee, attestation)?; + .try_for_each(|attestation| { + let indexed_attestation = ctxt.get_indexed_attestation(&self.state, attestation)?; - self.sets.push(indexed_attestation_signature_set( - self.state, - self.get_pubkey.clone(), - &attestation.signature, - &indexed_attestation, - self.spec, - )?); - - vec.push(indexed_attestation); - - Ok(vec) - }, - ) + self.sets.push(indexed_attestation_signature_set( + self.state, + self.get_pubkey.clone(), + &attestation.signature, + indexed_attestation, + self.spec, + )?); + Ok(()) + }) .map_err(Error::into) } diff --git a/consensus/state_processing/src/per_block_processing/process_operations.rs b/consensus/state_processing/src/per_block_processing/process_operations.rs index d164721440..83ebc15cb0 100644 --- a/consensus/state_processing/src/per_block_processing/process_operations.rs +++ b/consensus/state_processing/src/per_block_processing/process_operations.rs @@ -57,8 +57,14 @@ pub mod base { // Verify and apply each attestation. for (i, attestation) in attestations.iter().enumerate() { - verify_attestation_for_block_inclusion(state, attestation, verify_signatures, spec) - .map_err(|e| e.into_with_index(i))?; + verify_attestation_for_block_inclusion( + state, + attestation, + ctxt, + verify_signatures, + spec, + ) + .map_err(|e| e.into_with_index(i))?; let pending_attestation = PendingAttestation { aggregation_bits: attestation.aggregation_bits.clone(), @@ -115,9 +121,16 @@ pub mod altair { let proposer_index = ctxt.get_proposer_index(state, spec)?; - let indexed_attestation = - verify_attestation_for_block_inclusion(state, attestation, verify_signatures, spec) - .map_err(|e| e.into_with_index(att_index))?; + let attesting_indices = verify_attestation_for_block_inclusion( + state, + attestation, + ctxt, + verify_signatures, + spec, + ) + .map_err(|e| e.into_with_index(att_index))? + .attesting_indices + .clone(); // Matching roots, participation flag indices let data = &attestation.data; @@ -127,7 +140,7 @@ pub mod altair { // Update epoch participation flags. let mut proposer_reward_numerator = 0; - for index in &indexed_attestation.attesting_indices { + for index in &attesting_indices { let index = *index as usize; for (flag_index, &weight) in PARTICIPATION_FLAG_WEIGHTS.iter().enumerate() { diff --git a/consensus/state_processing/src/per_block_processing/verify_attestation.rs b/consensus/state_processing/src/per_block_processing/verify_attestation.rs index 5d8113af4f..303a6e3913 100644 --- a/consensus/state_processing/src/per_block_processing/verify_attestation.rs +++ b/consensus/state_processing/src/per_block_processing/verify_attestation.rs @@ -1,7 +1,7 @@ use super::errors::{AttestationInvalid as Invalid, BlockOperationError}; use super::VerifySignatures; -use crate::common::get_indexed_attestation; use crate::per_block_processing::is_valid_indexed_attestation; +use crate::ConsensusContext; use safe_arith::SafeArith; use types::*; @@ -15,12 +15,13 @@ fn error(reason: Invalid) -> BlockOperationError { /// to `state`. Otherwise, returns a descriptive `Err`. /// /// Optionally verifies the aggregate signature, depending on `verify_signatures`. -pub fn verify_attestation_for_block_inclusion( +pub fn verify_attestation_for_block_inclusion<'ctxt, T: EthSpec>( state: &BeaconState, attestation: &Attestation, + ctxt: &'ctxt mut ConsensusContext, verify_signatures: VerifySignatures, spec: &ChainSpec, -) -> Result> { +) -> Result<&'ctxt IndexedAttestation> { let data = &attestation.data; verify!( @@ -39,7 +40,7 @@ pub fn verify_attestation_for_block_inclusion( } ); - verify_attestation_for_state(state, attestation, verify_signatures, spec) + verify_attestation_for_state(state, attestation, ctxt, verify_signatures, spec) } /// Returns `Ok(())` if `attestation` is a valid attestation to the chain that precedes the given @@ -49,12 +50,13 @@ pub fn verify_attestation_for_block_inclusion( /// prior blocks in `state`. /// /// Spec v0.12.1 -pub fn verify_attestation_for_state( +pub fn verify_attestation_for_state<'ctxt, T: EthSpec>( state: &BeaconState, attestation: &Attestation, + ctxt: &'ctxt mut ConsensusContext, verify_signatures: VerifySignatures, spec: &ChainSpec, -) -> Result> { +) -> Result<&'ctxt IndexedAttestation> { let data = &attestation.data; verify!( @@ -66,9 +68,8 @@ pub fn verify_attestation_for_state( verify_casper_ffg_vote(attestation, state)?; // Check signature and bitfields - let committee = state.get_beacon_committee(attestation.data.slot, attestation.data.index)?; - let indexed_attestation = get_indexed_attestation(committee.committee, attestation)?; - is_valid_indexed_attestation(state, &indexed_attestation, verify_signatures, spec)?; + let indexed_attestation = ctxt.get_indexed_attestation(state, attestation)?; + is_valid_indexed_attestation(state, indexed_attestation, verify_signatures, spec)?; Ok(indexed_attestation) } diff --git a/lcli/src/transition_blocks.rs b/lcli/src/transition_blocks.rs index dbdb33b351..890c5b1661 100644 --- a/lcli/src/transition_blocks.rs +++ b/lcli/src/transition_blocks.rs @@ -347,6 +347,23 @@ fn do_transition( .map_err(|e| format!("Unable to build caches: {:?}", e))?; debug!("Build all caches (again): {:?}", t.elapsed()); + let mut ctxt = if let Some(ctxt) = saved_ctxt { + ctxt.clone() + } else { + let mut ctxt = ConsensusContext::new(pre_state.slot()) + .set_current_block_root(block_root) + .set_proposer_index(block.message().proposer_index()); + + if config.exclude_cache_builds { + ctxt = ctxt.set_epoch_cache( + EpochCache::new(&pre_state, spec) + .map_err(|e| format!("unable to build epoch cache: {e:?}"))?, + ); + *saved_ctxt = Some(ctxt.clone()); + } + ctxt + }; + if !config.no_signature_verification { let get_pubkey = move |validator_index| { validator_pubkey_cache @@ -367,31 +384,19 @@ fn do_transition( get_pubkey, decompressor, &block, - Some(block_root), - Some(block.message().proposer_index()), + &mut ctxt, spec, ) .map_err(|e| format!("Invalid block signature: {:?}", e))?; debug!("Batch verify block signatures: {:?}", t.elapsed()); + + // Signature verification should prime the indexed attestation cache. + assert_eq!( + ctxt.num_cached_indexed_attestations(), + block.message().body().attestations().len() + ); } - let mut ctxt = if let Some(ctxt) = saved_ctxt { - ctxt.clone() - } else { - let mut ctxt = ConsensusContext::new(pre_state.slot()) - .set_current_block_root(block_root) - .set_proposer_index(block.message().proposer_index()); - - if config.exclude_cache_builds { - ctxt = ctxt.set_epoch_cache( - EpochCache::new(&pre_state, spec) - .map_err(|e| format!("unable to build epoch cache: {e:?}"))?, - ); - *saved_ctxt = Some(ctxt.clone()); - } - ctxt - }; - let t = Instant::now(); per_block_processing(