Optimize attestation processing (#841)

* Start updating types

* WIP

* Signature hacking

* Existing EF tests passing with fake_crypto

* Updates

* Delete outdated API spec

* The refactor continues

* It compiles

* WIP test fixes

* All release tests passing bar genesis state parsing

* Update and test YamlConfig

* Update to spec v0.10 compatible BLS

* Updates to BLS EF tests

* Add EF test for AggregateVerify

And delete unused hash2curve tests for uncompressed points

* Update EF tests to v0.10.1

* Use optional block root correctly in block proc

* Use genesis fork in deposit domain. All tests pass

* Cargo fmt

* Fast aggregate verify test

* Update REST API docs

* Cargo fmt

* Fix unused import

* Bump spec tags to v0.10.1

* Add `seconds_per_eth1_block` to chainspec

* Update to timestamp based eth1 voting scheme

* Return None from `get_votes_to_consider` if block cache is empty

* Handle overflows in `is_candidate_block`

* Revert to failing tests

* Fix eth1 data sets test

* Choose default vote according to spec

* Fix collect_valid_votes tests

* Fix `get_votes_to_consider` to choose all eligible blocks

* Uncomment winning_vote tests

* Add comments; remove unused code

* Reduce seconds_per_eth1_block for simulation

* Addressed review comments

* Add test for default vote case

* Fix logs

* Remove unused functions

* Meter default eth1 votes

* Fix comments

* Address review comments; remove unused dependency

* Add first attempt at attestation proc. re-write

* Add version 2 of attestation processing

* Minor fixes

* Add validator pubkey cache

* Make get_indexed_attestation take a committee

* Link signature processing into new attn verification

* First working version

* Ensure pubkey cache is updated

* Add more metrics, slight optimizations

* Clone committee cache during attestation processing

* Update shuffling cache during block processing

* Remove old commented-out code

* Fix shuffling cache insert bug

* Used indexed attestation in fork choice

* Restructure attn processing, add metrics

* Add more detailed metrics

* Tidy, fix failing tests

* Fix failing tests, tidy

* Disable/delete two outdated tests

* Tidy

* Add pubkey cache persistence file

* Add more comments

* Integrate persistence file into builder

* Add pubkey cache tests

* Add data_dir to beacon chain builder

* Remove Option in pubkey cache persistence file

* Ensure consistency between datadir/data_dir

* Fix failing network test

* Tidy

* Fix todos

* Add attestation processing tests

* Add another test

* Only run attestation tests in release

* Make attestation tests MainnetEthSpec

* Address Michael's comments

* Remove redundant check

* Fix warning

* Fix failing test

Co-authored-by: Michael Sproul <micsproul@gmail.com>
Co-authored-by: Pawan Dhananjay <pawandhananjay@gmail.com>
This commit is contained in:
Paul Hauner
2020-03-05 17:19:35 +11:00
committed by GitHub
parent c141f1cc03
commit 6656cb00e4
38 changed files with 1226 additions and 344 deletions

View File

@@ -6,20 +6,21 @@ use crate::fork_choice::{Error as ForkChoiceError, ForkChoice};
use crate::head_tracker::HeadTracker;
use crate::metrics;
use crate::persisted_beacon_chain::{PersistedBeaconChain, BEACON_CHAIN_DB_KEY};
use crate::shuffling_cache::ShufflingCache;
use crate::timeout_rw_lock::TimeoutRwLock;
use crate::validator_pubkey_cache::ValidatorPubkeyCache;
use operation_pool::{OperationPool, PersistedOperationPool};
use slog::{debug, error, info, trace, warn, Logger};
use slot_clock::SlotClock;
use ssz::Encode;
use state_processing::per_block_processing::{
errors::{
AttestationValidationError, AttesterSlashingValidationError, ExitValidationError,
ProposerSlashingValidationError,
},
verify_attestation_for_state, VerifySignatures,
use state_processing::per_block_processing::errors::{
AttestationValidationError, AttesterSlashingValidationError, ExitValidationError,
ProposerSlashingValidationError,
};
use state_processing::{
per_block_processing, per_slot_processing, BlockProcessingError, BlockSignatureStrategy,
common::get_indexed_attestation, per_block_processing, per_slot_processing,
signature_sets::indexed_attestation_signature_set_from_pubkeys, BlockProcessingError,
BlockSignatureStrategy,
};
use std::borrow::Cow;
use std::cmp::Ordering;
@@ -53,6 +54,14 @@ const MAXIMUM_BLOCK_SLOT_NUMBER: u64 = 4_294_967_296; // 2^32
/// head.
const HEAD_LOCK_TIMEOUT: Duration = Duration::from_secs(1);
/// The time-out before failure during an operation to take a read/write RwLock on the
/// attestation cache.
const ATTESTATION_CACHE_LOCK_TIMEOUT: Duration = Duration::from_secs(1);
/// The time-out before failure during an operation to take a read/write RwLock on the
/// validator pubkey cache.
const VALIDATOR_PUBKEY_CACHE_LOCK_TIMEOUT: Duration = Duration::from_secs(1);
#[derive(Debug, PartialEq)]
pub enum BlockProcessingOutcome {
/// Block was valid and imported into the block graph.
@@ -93,14 +102,29 @@ pub enum AttestationProcessingOutcome {
},
/// The attestation is attesting to a state that is later than itself. (Viz., attesting to the
/// future).
AttestsToFutureState {
state: Slot,
AttestsToFutureBlock {
block: Slot,
attestation: Slot,
},
/// The slot is finalized, no need to import.
FinalizedSlot {
attestation: Epoch,
finalized: Epoch,
attestation: Slot,
finalized: Slot,
},
FutureEpoch {
attestation_epoch: Epoch,
current_epoch: Epoch,
},
PastEpoch {
attestation_epoch: Epoch,
current_epoch: Epoch,
},
BadTargetEpoch,
UnknownTargetRoot(Hash256),
InvalidSignature,
NoCommitteeForSlotAndIndex {
slot: Slot,
index: CommitteeIndex,
},
Invalid(AttestationValidationError),
}
@@ -161,6 +185,10 @@ pub struct BeaconChain<T: BeaconChainTypes> {
pub event_handler: T::EventHandler,
/// Used to track the heads of the beacon chain.
pub(crate) head_tracker: HeadTracker,
/// Caches the shuffling for a given epoch and state root.
pub(crate) shuffling_cache: TimeoutRwLock<ShufflingCache>,
/// Caches a map of `validator_index -> validator_pubkey`.
pub(crate) validator_pubkey_cache: TimeoutRwLock<ValidatorPubkeyCache>,
/// Logging to CLI, etc.
pub(crate) log: Logger,
}
@@ -710,11 +738,15 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
&self,
attestation: Attestation<T::EthSpec>,
) -> Result<AttestationProcessingOutcome, Error> {
metrics::inc_counter(&metrics::ATTESTATION_PROCESSING_REQUESTS);
let timer = metrics::start_timer(&metrics::ATTESTATION_PROCESSING_TIMES);
let outcome = self.process_attestation_internal(attestation.clone());
match &outcome {
Ok(outcome) => match outcome {
AttestationProcessingOutcome::Processed => {
metrics::inc_counter(&metrics::ATTESTATION_PROCESSING_SUCCESSES);
trace!(
self.log,
"Beacon attestation imported";
@@ -756,6 +788,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}
}
metrics::stop_timer(timer);
outcome
}
@@ -763,216 +796,253 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
&self,
attestation: Attestation<T::EthSpec>,
) -> Result<AttestationProcessingOutcome, Error> {
metrics::inc_counter(&metrics::ATTESTATION_PROCESSING_REQUESTS);
let timer = metrics::start_timer(&metrics::ATTESTATION_PROCESSING_TIMES);
let initial_validation_timer =
metrics::start_timer(&metrics::ATTESTATION_PROCESSING_INITIAL_VALIDATION_TIMES);
// There is no point in processing an attestation with an empty bitfield. Reject
// it immediately.
if attestation.aggregation_bits.num_set_bits() == 0 {
return Ok(AttestationProcessingOutcome::EmptyAggregationBitfield);
}
// From the store, load the attestation's "head block".
//
// An honest validator would have set this block to be the head of the chain (i.e., the
// result of running fork choice).
let result = if let Some(attestation_head_block) =
self.get_block(&attestation.data.beacon_block_root)?
{
// If the attestation points to a block in the same epoch in which it was made,
// then it is sufficient to load the state from that epoch's boundary, because
// the epoch-variable fields like the justified checkpoints cannot have changed
// between the epoch boundary and when the attestation was made. If conversely,
// the attestation points to a block in a prior epoch, then it is necessary to
// load the full state corresponding to its block, and transition it to the
// attestation's epoch.
let attestation_block_root = attestation_head_block.state_root();
let attestation_epoch = attestation.data.target.epoch;
let slots_per_epoch = T::EthSpec::slots_per_epoch();
let mut state = if attestation_epoch
== attestation_head_block.slot().epoch(slots_per_epoch)
{
self.store
.load_epoch_boundary_state(&attestation_block_root)?
.ok_or_else(|| Error::MissingBeaconState(attestation_block_root))?
} else {
let mut state = self
.get_state_caching_only_with_committee_caches(
&attestation_head_block.state_root(),
Some(attestation_head_block.slot()),
)?
.ok_or_else(|| {
Error::MissingBeaconState(attestation_head_block.state_root())
})?;
let attestation_epoch = attestation.data.slot.epoch(T::EthSpec::slots_per_epoch());
let epoch_now = self.epoch()?;
let target = attestation.data.target.clone();
// Fastforward the state to the epoch in which the attestation was made.
// NOTE: this looks like a potential DoS vector, we should probably limit
// the amount we're willing to fastforward without a valid signature.
for _ in state.slot.as_u64()..attestation_epoch.start_slot(slots_per_epoch).as_u64()
{
// Note: we provide the zero hash as the state root because the state root is
// irrelevant to attestation processing and therefore a waste of time to
// compute.
per_slot_processing(&mut state, Some(Hash256::zero()), &self.spec)?;
}
state
};
state.build_committee_cache(RelativeEpoch::Current, &self.spec)?;
// Reject any attestation where the `state` loaded from `data.beacon_block_root`
// has a higher slot than the attestation.
//
// Permitting this would allow for attesters to vote on _future_ slots.
if state.slot > attestation.data.slot {
Ok(AttestationProcessingOutcome::AttestsToFutureState {
state: state.slot,
attestation: attestation.data.slot,
})
} else {
self.process_attestation_for_state_and_block(
attestation,
&state,
&attestation_head_block.message,
)
}
} else {
// Drop any attestation where we have not processed `attestation.data.beacon_block_root`.
//
// This is likely overly restrictive, we could store the attestation for later
// processing.
let head_epoch = self.head_info()?.slot.epoch(T::EthSpec::slots_per_epoch());
let attestation_epoch = attestation.data.slot.epoch(T::EthSpec::slots_per_epoch());
// Only log a warning if our head is in a reasonable place to verify this attestation.
// This avoids excess logging during syncing.
if head_epoch + 1 >= attestation_epoch {
trace!(
self.log,
"Dropped attestation for unknown block";
"block" => format!("{}", attestation.data.beacon_block_root)
);
} else {
trace!(
self.log,
"Dropped attestation for unknown block";
"block" => format!("{}", attestation.data.beacon_block_root)
);
}
Ok(AttestationProcessingOutcome::UnknownHeadBlock {
beacon_block_root: attestation.data.beacon_block_root,
})
};
metrics::stop_timer(timer);
if let Ok(AttestationProcessingOutcome::Processed) = &result {
metrics::inc_counter(&metrics::ATTESTATION_PROCESSING_SUCCESSES);
// Attestation must be from the current or previous epoch.
if attestation_epoch > epoch_now {
return Ok(AttestationProcessingOutcome::FutureEpoch {
attestation_epoch,
current_epoch: epoch_now,
});
} else if attestation_epoch + 1 < epoch_now {
return Ok(AttestationProcessingOutcome::PastEpoch {
attestation_epoch,
current_epoch: epoch_now,
});
}
result
}
if target.epoch != attestation.data.slot.epoch(T::EthSpec::slots_per_epoch()) {
return Ok(AttestationProcessingOutcome::BadTargetEpoch);
}
/// Verifies the `attestation` against the `state` to which it is attesting.
///
/// Updates fork choice with any new latest messages, but _does not_ find or update the head.
///
/// ## Notes
///
/// The given `state` must fulfil one of the following conditions:
///
/// - `state` corresponds to the `block.state_root` identified by
/// `attestation.data.beacon_block_root`. (Viz., `attestation` was created using `state`).
/// - `state.slot` is in the same epoch as `data.target.epoch` and
/// `attestation.data.beacon_block_root` is in the history of `state`.
///
/// Additionally, `attestation.data.beacon_block_root` **must** be available to read in
/// `self.store` _and_ be the root of the given `block`.
///
/// If the given conditions are not fulfilled, the function may error or provide a false
/// negative (indicating that a given `attestation` is invalid when it is was validly formed).
fn process_attestation_for_state_and_block(
&self,
attestation: Attestation<T::EthSpec>,
state: &BeaconState<T::EthSpec>,
block: &BeaconBlock<T::EthSpec>,
) -> Result<AttestationProcessingOutcome, Error> {
// Find the highest between:
// Attestation target must be for a known block.
//
// - The highest valid finalized epoch we've ever seen (i.e., the head).
// - The finalized epoch that this attestation was created against.
let finalized_epoch = std::cmp::max(
self.head_info()?.finalized_checkpoint.epoch,
state.finalized_checkpoint.epoch,
);
// A helper function to allow attestation processing to be metered.
let verify_attestation_for_state = |state, attestation, spec, verify_signatures| {
let timer = metrics::start_timer(&metrics::ATTESTATION_PROCESSING_CORE);
let result = verify_attestation_for_state(state, attestation, spec, verify_signatures);
metrics::stop_timer(timer);
result
// We use fork choice to find the target root, which means that we reject any attestation
// that has a `target.root` earlier than our latest finalized root. There's no point in
// processing an attestation that does not include our latest finalized block in its chain.
//
// We do not delay consideration for later, we simply drop the attestation.
let (target_block_slot, target_block_state_root) = if let Some((slot, state_root)) =
self.fork_choice.block_slot_and_state_root(&target.root)
{
(slot, state_root)
} else {
return Ok(AttestationProcessingOutcome::UnknownTargetRoot(target.root));
};
if block.slot > 0 && block.slot <= finalized_epoch.start_slot(T::EthSpec::slots_per_epoch())
// Load the slot and state root for `attestation.data.beacon_block_root`.
//
// This indirectly checks to see if the `attestation.data.beacon_block_root` is in our fork
// choice. Any known, non-finalized block should be in fork choice, so this check
// immediately filters out attestations that attest to a block that has not been processed.
//
// Attestations must be for a known block. If the block is unknown, we simply drop the
// attestation and do not delay consideration for later.
let block_slot = if let Some((slot, _state_root)) = self
.fork_choice
.block_slot_and_state_root(&attestation.data.beacon_block_root)
{
// Ignore any attestation where the slot of `data.beacon_block_root` is equal to or
// prior to the finalized epoch.
//
// For any valid attestation if the `beacon_block_root` is prior to finalization, then
// all other parameters (source, target, etc) must all be prior to finalization and
// therefore no longer interesting.
//
// We allow the case where the block is the genesis block. Without this, all
// attestations prior to the first block being produced would be invalid.
Ok(AttestationProcessingOutcome::FinalizedSlot {
attestation: block.slot.epoch(T::EthSpec::slots_per_epoch()),
finalized: finalized_epoch,
})
} else if let Err(e) =
verify_attestation_for_state(state, &attestation, VerifySignatures::True, &self.spec)
{
warn!(
self.log,
"Invalid attestation";
"state_epoch" => state.current_epoch(),
"error" => format!("{:?}", e),
);
Ok(AttestationProcessingOutcome::Invalid(e))
slot
} else {
// If the attestation is from the current or previous epoch, supply it to the fork
// choice. This is FMD GHOST.
let current_epoch = self.epoch()?;
if attestation.data.target.epoch == current_epoch
|| attestation.data.target.epoch == current_epoch - 1
{
// Provide the attestation to fork choice, updating the validator latest messages but
// _without_ finding and updating the head.
if let Err(e) = self.fork_choice.process_attestation(&state, &attestation) {
error!(
self.log,
"Add attestation to fork choice failed";
"beacon_block_root" => format!("{}", attestation.data.beacon_block_root),
"error" => format!("{:?}", e)
);
return Err(e.into());
return Ok(AttestationProcessingOutcome::UnknownHeadBlock {
beacon_block_root: attestation.data.beacon_block_root,
});
};
// TODO: currently we do not check the FFG source/target. This is what the spec dictates
// but it seems wrong.
//
// I have opened an issue on the specs repo for this:
//
// https://github.com/ethereum/eth2.0-specs/issues/1636
//
// We should revisit this code once that issue has been resolved.
// Attestations must not be for blocks in the future. If this is the case, the attestation
// should not be considered.
if block_slot > attestation.data.slot {
return Ok(AttestationProcessingOutcome::AttestsToFutureBlock {
block: block_slot,
attestation: attestation.data.slot,
});
}
metrics::stop_timer(initial_validation_timer);
let cache_wait_timer =
metrics::start_timer(&metrics::ATTESTATION_PROCESSING_SHUFFLING_CACHE_WAIT_TIMES);
let mut shuffling_cache = self
.shuffling_cache
.try_write_for(ATTESTATION_CACHE_LOCK_TIMEOUT)
.ok_or_else(|| Error::AttestationCacheLockTimeout)?;
metrics::stop_timer(cache_wait_timer);
let indexed_attestation =
if let Some(committee_cache) = shuffling_cache.get(attestation_epoch, target.root) {
if let Some(committee) = committee_cache
.get_beacon_committee(attestation.data.slot, attestation.data.index)
{
let indexed_attestation =
get_indexed_attestation(committee.committee, &attestation)?;
// Drop the shuffling cache to avoid holding the lock for any longer than
// required.
drop(shuffling_cache);
indexed_attestation
} else {
return Ok(AttestationProcessingOutcome::NoCommitteeForSlotAndIndex {
slot: attestation.data.slot,
index: attestation.data.index,
});
}
} else {
// Drop the shuffling cache to avoid holding the lock for any longer than
// required.
drop(shuffling_cache);
debug!(
self.log,
"Attestation processing cache miss";
"attn_epoch" => attestation_epoch.as_u64(),
"head_block_epoch" => block_slot.epoch(T::EthSpec::slots_per_epoch()).as_u64(),
);
let state_read_timer =
metrics::start_timer(&metrics::ATTESTATION_PROCESSING_STATE_READ_TIMES);
let mut state = self
.get_state_caching_only_with_committee_caches(
&target_block_state_root,
Some(target_block_slot),
)?
.ok_or_else(|| Error::MissingBeaconState(target_block_state_root))?;
metrics::stop_timer(state_read_timer);
let state_skip_timer =
metrics::start_timer(&metrics::ATTESTATION_PROCESSING_STATE_SKIP_TIMES);
while state.current_epoch() + 1 < attestation_epoch {
// Here we tell `per_slot_processing` to skip hashing the state and just
// use the zero hash instead.
//
// The state roots are not useful for the shuffling, so there's no need to
// compute them.
per_slot_processing(&mut state, Some(Hash256::zero()), &self.spec)?
}
metrics::stop_timer(state_skip_timer);
let committee_building_timer =
metrics::start_timer(&metrics::ATTESTATION_PROCESSING_COMMITTEE_BUILDING_TIMES);
let relative_epoch =
RelativeEpoch::from_epoch(state.current_epoch(), attestation_epoch)
.map_err(Error::IncorrectStateForAttestation)?;
state.build_committee_cache(relative_epoch, &self.spec)?;
let committee_cache = state.committee_cache(relative_epoch)?;
self.shuffling_cache
.try_write_for(ATTESTATION_CACHE_LOCK_TIMEOUT)
.ok_or_else(|| Error::AttestationCacheLockTimeout)?
.insert(attestation_epoch, target.root, committee_cache);
metrics::stop_timer(committee_building_timer);
if let Some(committee) = committee_cache
.get_beacon_committee(attestation.data.slot, attestation.data.index)
{
get_indexed_attestation(committee.committee, &attestation)?
} else {
return Ok(AttestationProcessingOutcome::NoCommitteeForSlotAndIndex {
slot: attestation.data.slot,
index: attestation.data.index,
});
}
};
let signature_setup_timer =
metrics::start_timer(&metrics::ATTESTATION_PROCESSING_SIGNATURE_SETUP_TIMES);
let pubkey_cache = self
.validator_pubkey_cache
.try_read_for(VALIDATOR_PUBKEY_CACHE_LOCK_TIMEOUT)
.ok_or_else(|| Error::ValidatorPubkeyCacheLockTimeout)?;
let pubkeys = indexed_attestation
.attesting_indices
.iter()
.map(|i| {
pubkey_cache
.get(*i as usize)
.ok_or_else(|| Error::ValidatorPubkeyCacheIncomplete(*i as usize))
})
.collect::<Result<Vec<&PublicKey>, Error>>()?;
let fork = self
.canonical_head
.try_read_for(HEAD_LOCK_TIMEOUT)
.ok_or_else(|| Error::CanonicalHeadLockTimeout)
.map(|head| head.beacon_state.fork.clone())?;
let signature_set = indexed_attestation_signature_set_from_pubkeys(
pubkeys,
&attestation.signature,
&indexed_attestation,
&fork,
&self.spec,
)
.map_err(Error::SignatureSetError)?;
metrics::stop_timer(signature_setup_timer);
let signature_verification_timer =
metrics::start_timer(&metrics::ATTESTATION_PROCESSING_SIGNATURE_TIMES);
let signature_is_valid = signature_set.is_valid();
metrics::stop_timer(signature_verification_timer);
if signature_is_valid {
// Provide the attestation to fork choice, updating the validator latest messages but
// _without_ finding and updating the head.
if let Err(e) = self
.fork_choice
.process_indexed_attestation(&indexed_attestation)
{
error!(
self.log,
"Add attestation to fork choice failed";
"beacon_block_root" => format!("{}", attestation.data.beacon_block_root),
"error" => format!("{:?}", e)
);
return Err(e.into());
}
// Provide the valid attestation to op pool, which may choose to retain the
// attestation for inclusion in a future block.
if self.eth1_chain.is_some() {
self.op_pool
.insert_attestation(attestation, state, &self.spec)?;
.insert_attestation(attestation, &fork, &self.spec)?;
};
// Update the metrics.
metrics::inc_counter(&metrics::ATTESTATION_PROCESSING_SUCCESSES);
Ok(AttestationProcessingOutcome::Processed)
} else {
Ok(AttestationProcessingOutcome::InvalidSignature)
}
}
@@ -1289,6 +1359,41 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
let fork_choice_register_timer =
metrics::start_timer(&metrics::BLOCK_PROCESSING_FORK_CHOICE_REGISTER);
// If there are new validators in this block, update our pubkey cache.
//
// We perform this _before_ adding the block to fork choice because the pubkey cache is
// used by attestation processing which will only process an attestation if the block is
// known to fork choice. This ordering ensure that the pubkey cache is always up-to-date.
self.validator_pubkey_cache
.try_write_for(VALIDATOR_PUBKEY_CACHE_LOCK_TIMEOUT)
.ok_or_else(|| Error::ValidatorPubkeyCacheLockTimeout)?
.import_new_pubkeys(&state)?;
// If the imported block is in the previous or current epochs (according to the
// wall-clock), check to see if this is the first block of the epoch. If so, add the
// committee to the shuffling cache.
if state.current_epoch() + 1 >= self.epoch()?
&& parent_block.slot().epoch(T::EthSpec::slots_per_epoch()) != state.current_epoch()
{
let mut shuffling_cache = self
.shuffling_cache
.try_write_for(ATTESTATION_CACHE_LOCK_TIMEOUT)
.ok_or_else(|| Error::AttestationCacheLockTimeout)?;
let committee_cache = state.committee_cache(RelativeEpoch::Current)?;
let epoch_start_slot = state
.current_epoch()
.start_slot(T::EthSpec::slots_per_epoch());
let target_root = if state.slot == epoch_start_slot {
block_root
} else {
*state.get_block_root(epoch_start_slot)?
};
shuffling_cache.insert(state.current_epoch(), target_root, committee_cache);
}
// Register the new block with the fork choice service.
if let Err(e) = self
.fork_choice