Add attestation gossip pre-verification (#983)

* Add PH & MS slot clock changes

* Account for genesis time

* Add progress on duties refactor

* Add simple is_aggregator bool to val subscription

* Start work on attestation_verification.rs

* Add progress on ObservedAttestations

* Progress with ObservedAttestations

* Fix tests

* Add observed attestations to the beacon chain

* Add attestation observation to processing code

* Add progress on attestation verification

* Add first draft of ObservedAttesters

* Add more tests

* Add observed attesters to beacon chain

* Add observers to attestation processing

* Add more attestation verification

* Create ObservedAggregators map

* Remove commented-out code

* Add observed aggregators into chain

* Add progress

* Finish adding features to attestation verification

* Ensure beacon chain compiles

* Link attn verification into chain

* Integrate new attn verification in chain

* Remove old attestation processing code

* Start trying to fix beacon_chain tests

* Split adding into pools into two functions

* Add aggregation to harness

* Get test harness working again

* Adjust the number of aggregators for test harness

* Fix edge-case in harness

* Integrate new attn processing in network

* Fix compile bug in validator_client

* Update validator API endpoints

* Fix aggreagation in test harness

* Fix enum thing

* Fix attestation observation bug:

* Patch failing API tests

* Start adding comments to attestation verification

* Remove unused attestation field

* Unify "is block known" logic

* Update comments

* Supress fork choice errors for network processing

* Add todos

* Tidy

* Add gossip attn tests

* Disallow test harness to produce old attns

* Comment out in-progress tests

* Partially address pruning tests

* Fix failing store test

* Add aggregate tests

* Add comments about which spec conditions we check

* Dont re-aggregate

* Split apart test harness attn production

* Fix compile error in network

* Make progress on commented-out test

* Fix skipping attestation test

* Add fork choice verification tests

* Tidy attn tests, remove dead code

* Remove some accidentally added code

* Fix clippy lint

* Rename test file

* Add block tests, add cheap block proposer check

* Rename block testing file

* Add observed_block_producers

* Tidy

* Switch around block signature verification

* Finish block testing

* Remove gossip from signature tests

* First pass of self review

* Fix deviation in spec

* Update test spec tags

* Start moving over to hashset

* Finish moving observed attesters to hashmap

* Move aggregation pool over to hashmap

* Make fc attn borrow again

* Fix rest_api compile error

* Fix missing comments

* Fix monster test

* Uncomment increasing slots test

* Address remaining comments

* Remove unsafe, use cfg test

* Remove cfg test flag

* Fix dodgy comment

* Ignore aggregates that are already known.

* Unify aggregator modulo logic

* Fix typo in logs

* Refactor validator subscription logic

* Avoid reproducing selection proof

* Skip HTTP call if no subscriptions

* Rename DutyAndState -> DutyAndProof

* Tidy logs

* Print root as dbg

* Fix compile errors in tests

* Fix compile error in test
This commit is contained in:
Paul Hauner
2020-05-06 21:42:56 +10:00
committed by GitHub
parent 1552f9997e
commit ad5bd6412a
38 changed files with 4952 additions and 1479 deletions

View File

@@ -0,0 +1,886 @@
//! Provides verification for the following attestations:
//!
//! - "Unaggregated" `Attestation` received from either gossip or the HTTP API.
//! - "Aggregated" `SignedAggregateAndProof` received from gossip or the HTTP API.
//!
//! For clarity, we define:
//!
//! - Unaggregated: an `Attestation` object that has exactly one aggregation bit set.
//! - Aggregated: a `SignedAggregateAndProof` which has zero or more signatures.
//! - Note: "zero or more" may soon change to "one or more".
//!
//! Similar to the `crate::block_verification` module, we try to avoid doing duplicate verification
//! work as an attestation passes through different stages of verification. We represent these
//! different stages of verification with wrapper types. These wrapper-types flow in a particular
//! pattern:
//!
//! ```ignore
//! types::Attestation types::SignedAggregateAndProof
//! | |
//! ▼ ▼
//! VerifiedUnaggregatedAttestation VerifiedAggregatedAttestation
//! | |
//! -------------------------------------
//! |
//! ▼
//! ForkChoiceVerifiedAttestation
//! ```
use crate::{
beacon_chain::{
ATTESTATION_CACHE_LOCK_TIMEOUT, HEAD_LOCK_TIMEOUT, MAXIMUM_GOSSIP_CLOCK_DISPARITY,
VALIDATOR_PUBKEY_CACHE_LOCK_TIMEOUT,
},
metrics,
observed_attestations::ObserveOutcome,
observed_attesters::Error as ObservedAttestersError,
BeaconChain, BeaconChainError, BeaconChainTypes,
};
use bls::verify_signature_sets;
use slog::debug;
use slot_clock::SlotClock;
use state_processing::{
common::get_indexed_attestation,
per_block_processing::errors::AttestationValidationError,
per_slot_processing,
signature_sets::{
indexed_attestation_signature_set_from_pubkeys,
signed_aggregate_selection_proof_signature_set, signed_aggregate_signature_set,
},
};
use std::borrow::Cow;
use tree_hash::TreeHash;
use types::{
Attestation, BeaconCommittee, CommitteeIndex, Epoch, EthSpec, Hash256, IndexedAttestation,
RelativeEpoch, SelectionProof, SignedAggregateAndProof, Slot,
};
/// Returned when an attestation was not successfully verified. It might not have been verified for
/// two reasons:
///
/// - The attestation is malformed or inappropriate for the context (indicated by all variants
/// other than `BeaconChainError`).
/// - The application encountered an internal error whilst attempting to determine validity
/// (the `BeaconChainError` variant)
#[derive(Debug, PartialEq)]
pub enum Error {
/// The attestation is from a slot that is later than the current slot (with respect to the
/// gossip clock disparity).
FutureSlot {
attestation_slot: Slot,
latest_permissible_slot: Slot,
},
/// The attestation is from a slot that is prior to the earliest permissible slot (with
/// respect to the gossip clock disparity).
PastSlot {
attestation_slot: Slot,
earliest_permissible_slot: Slot,
},
/// The attestations aggregation bits were empty when they shouldn't be.
EmptyAggregationBitfield,
/// The `selection_proof` on the aggregate attestation does not elect it as an aggregator.
InvalidSelectionProof { aggregator_index: u64 },
/// The `selection_proof` on the aggregate attestation selects it as a validator, however the
/// aggregator index is not in the committee for that attestation.
AggregatorNotInCommittee { aggregator_index: u64 },
/// The aggregator index refers to a validator index that we have not seen.
AggregatorPubkeyUnknown(u64),
/// The attestation has been seen before; either in a block, on the gossip network or from a
/// local validator.
AttestationAlreadyKnown(Hash256),
/// There has already been an aggregation observed for this validator, we refuse to process a
/// second.
AggregatorAlreadyKnown(u64),
/// The aggregator index is higher than the maximum possible validator count.
ValidatorIndexTooHigh(usize),
/// The `attestation.data.beacon_block_root` block is unknown.
UnknownHeadBlock { beacon_block_root: Hash256 },
/// The `attestation.data.slot` is not from the same epoch as `data.target.epoch` and therefore
/// the attestation is invalid.
BadTargetEpoch,
/// The target root of the attestation points to a block that we have not verified.
UnknownTargetRoot(Hash256),
/// A signature on the attestation is invalid.
InvalidSignature,
/// There is no committee for the slot and committee index of this attestation and the
/// attestation should not have been produced.
NoCommitteeForSlotAndIndex { slot: Slot, index: CommitteeIndex },
/// The unaggregated attestation doesn't have only one aggregation bit set.
NotExactlyOneAggregationBitSet(usize),
/// We have already observed an attestation for the `validator_index` and refuse to process
/// another.
PriorAttestationKnown { validator_index: u64, epoch: Epoch },
/// The attestation is for an epoch in the future (with respect to the gossip clock disparity).
FutureEpoch {
attestation_epoch: Epoch,
current_epoch: Epoch,
},
/// The attestation is for an epoch in the past (with respect to the gossip clock disparity).
PastEpoch {
attestation_epoch: Epoch,
current_epoch: Epoch,
},
/// The attestation is attesting to a state that is later than itself. (Viz., attesting to the
/// future).
AttestsToFutureBlock { block: Slot, attestation: Slot },
/// The attestation failed the `state_processing` verification stage.
Invalid(AttestationValidationError),
/// There was an error whilst processing the attestation. It is not known if it is valid or invalid.
BeaconChainError(BeaconChainError),
}
impl From<BeaconChainError> for Error {
fn from(e: BeaconChainError) -> Self {
Error::BeaconChainError(e)
}
}
/// Wraps a `SignedAggregateAndProof` that has been verified for propagation on the gossip network.
pub struct VerifiedAggregatedAttestation<T: BeaconChainTypes> {
signed_aggregate: SignedAggregateAndProof<T::EthSpec>,
indexed_attestation: IndexedAttestation<T::EthSpec>,
}
/// Wraps an `Attestation` that has been verified for propagation on the gossip network.
pub struct VerifiedUnaggregatedAttestation<T: BeaconChainTypes> {
attestation: Attestation<T::EthSpec>,
indexed_attestation: IndexedAttestation<T::EthSpec>,
}
/// Custom `Clone` implementation is to avoid the restrictive trait bounds applied by the usual derive
/// macro.
impl<T: BeaconChainTypes> Clone for VerifiedUnaggregatedAttestation<T> {
fn clone(&self) -> Self {
Self {
attestation: self.attestation.clone(),
indexed_attestation: self.indexed_attestation.clone(),
}
}
}
/// Wraps an `indexed_attestation` that is valid for application to fork choice. The
/// `indexed_attestation` will have been generated via the `VerifiedAggregatedAttestation` or
/// `VerifiedUnaggregatedAttestation` wrappers.
pub struct ForkChoiceVerifiedAttestation<'a, T: BeaconChainTypes> {
indexed_attestation: &'a IndexedAttestation<T::EthSpec>,
}
/// A helper trait implemented on wrapper types that can be progressed to a state where they can be
/// verified for application to fork choice.
pub trait IntoForkChoiceVerifiedAttestation<'a, T: BeaconChainTypes> {
fn into_fork_choice_verified_attestation(
&'a self,
chain: &BeaconChain<T>,
) -> Result<ForkChoiceVerifiedAttestation<'a, T>, Error>;
}
impl<'a, T: BeaconChainTypes> IntoForkChoiceVerifiedAttestation<'a, T>
for VerifiedAggregatedAttestation<T>
{
/// Progresses the `VerifiedAggregatedAttestation` to a stage where it is valid for application
/// to the fork-choice rule (or not).
fn into_fork_choice_verified_attestation(
&'a self,
chain: &BeaconChain<T>,
) -> Result<ForkChoiceVerifiedAttestation<T>, Error> {
ForkChoiceVerifiedAttestation::from_signature_verified_components(
&self.indexed_attestation,
chain,
)
}
}
impl<'a, T: BeaconChainTypes> IntoForkChoiceVerifiedAttestation<'a, T>
for VerifiedUnaggregatedAttestation<T>
{
/// Progresses the `Attestation` to a stage where it is valid for application to the
/// fork-choice rule (or not).
fn into_fork_choice_verified_attestation(
&'a self,
chain: &BeaconChain<T>,
) -> Result<ForkChoiceVerifiedAttestation<T>, Error> {
ForkChoiceVerifiedAttestation::from_signature_verified_components(
&self.indexed_attestation,
chain,
)
}
}
impl<'a, T: BeaconChainTypes> IntoForkChoiceVerifiedAttestation<'a, T>
for ForkChoiceVerifiedAttestation<'a, T>
{
/// Simply returns itself.
fn into_fork_choice_verified_attestation(
&'a self,
_: &BeaconChain<T>,
) -> Result<ForkChoiceVerifiedAttestation<T>, Error> {
Ok(Self {
indexed_attestation: self.indexed_attestation,
})
}
}
impl<T: BeaconChainTypes> VerifiedAggregatedAttestation<T> {
/// Returns `Ok(Self)` if the `signed_aggregate` is valid to be (re)published on the gossip
/// network.
pub fn verify(
signed_aggregate: SignedAggregateAndProof<T::EthSpec>,
chain: &BeaconChain<T>,
) -> Result<Self, Error> {
let attestation = &signed_aggregate.message.aggregate;
// Ensure attestation is within the last ATTESTATION_PROPAGATION_SLOT_RANGE slots (within a
// MAXIMUM_GOSSIP_CLOCK_DISPARITY allowance).
//
// We do not queue future attestations for later processing.
verify_propagation_slot_range(chain, attestation)?;
// Ensure the aggregated attestation has not already been seen locally.
//
// TODO: this part of the code is not technically to spec, however I have raised a PR to
// change it:
//
// https://github.com/ethereum/eth2.0-specs/pull/1749
let attestation_root = attestation.tree_hash_root();
if chain
.observed_attestations
.is_known(attestation, attestation_root)
.map_err(|e| Error::BeaconChainError(e.into()))?
{
return Err(Error::AttestationAlreadyKnown(attestation_root));
}
let aggregator_index = signed_aggregate.message.aggregator_index;
// Ensure there has been no other observed aggregate for the given `aggregator_index`.
//
// Note: do not observe yet, only observe once the attestation has been verfied.
match chain
.observed_aggregators
.validator_has_been_observed(attestation, aggregator_index as usize)
{
Ok(true) => Err(Error::AggregatorAlreadyKnown(aggregator_index)),
Ok(false) => Ok(()),
Err(ObservedAttestersError::ValidatorIndexTooHigh(i)) => {
Err(Error::ValidatorIndexTooHigh(i))
}
Err(e) => Err(BeaconChainError::from(e).into()),
}?;
// Ensure the block being voted for (attestation.data.beacon_block_root) passes validation.
//
// This indirectly checks to see if the `attestation.data.beacon_block_root` is in our fork
// choice. Any known, non-finalized, processed block should be in fork choice, so this
// check immediately filters out attestations that attest to a block that has not been
// processed.
//
// Attestations must be for a known block. If the block is unknown, we simply drop the
// attestation and do not delay consideration for later.
verify_head_block_is_known(chain, &attestation)?;
let indexed_attestation = map_attestation_committee(chain, attestation, |committee| {
// Note: this clones the signature which is known to be a relatively slow operation.
//
// Future optimizations should remove this clone.
let selection_proof =
SelectionProof::from(signed_aggregate.message.selection_proof.clone());
if !selection_proof
.is_aggregator(committee.committee.len(), &chain.spec)
.map_err(|e| Error::BeaconChainError(e.into()))?
{
return Err(Error::InvalidSelectionProof { aggregator_index });
}
/*
* I have raised a PR that will likely get merged in v0.12.0:
*
* https://github.com/ethereum/eth2.0-specs/pull/1732
*
* If this PR gets merged, uncomment this code and remove the code below.
*
if !committee
.committee
.iter()
.any(|validator_index| *validator_index as u64 == aggregator_index)
{
return Err(Error::AggregatorNotInCommittee { aggregator_index });
}
*/
get_indexed_attestation(committee.committee, &attestation)
.map_err(|e| BeaconChainError::from(e).into())
})?;
// Ensure the aggregator is in the attestation.
//
// I've raised an issue with this here:
//
// https://github.com/ethereum/eth2.0-specs/pull/1732
//
// I suspect PR my will get merged in v0.12 and we'll need to delete this code and
// uncomment the code above.
if !indexed_attestation
.attesting_indices
.iter()
.any(|validator_index| *validator_index as u64 == aggregator_index)
{
return Err(Error::AggregatorNotInCommittee { aggregator_index });
}
if !verify_signed_aggregate_signatures(chain, &signed_aggregate, &indexed_attestation)? {
return Err(Error::InvalidSignature);
}
// Observe the valid attestation so we do not re-process it.
//
// It's important to double check that the attestation is not already known, otherwise two
// attestations processed at the same time could be published.
if let ObserveOutcome::AlreadyKnown = chain
.observed_attestations
.observe_attestation(attestation, Some(attestation_root))
.map_err(|e| Error::BeaconChainError(e.into()))?
{
return Err(Error::AttestationAlreadyKnown(attestation_root));
}
// Observe the aggregator so we don't process another aggregate from them.
//
// It's important to double check that the attestation is not already known, otherwise two
// attestations processed at the same time could be published.
if chain
.observed_aggregators
.observe_validator(&attestation, aggregator_index as usize)
.map_err(|e| BeaconChainError::from(e))?
{
return Err(Error::PriorAttestationKnown {
validator_index: aggregator_index,
epoch: attestation.data.target.epoch,
});
}
Ok(VerifiedAggregatedAttestation {
signed_aggregate,
indexed_attestation,
})
}
/// A helper function to add this aggregate to `beacon_chain.op_pool`.
pub fn add_to_pool(self, chain: &BeaconChain<T>) -> Result<Self, Error> {
chain.add_to_block_inclusion_pool(self)
}
/// A helper function to add this aggregate to `beacon_chain.fork_choice`.
pub fn add_to_fork_choice(
&self,
chain: &BeaconChain<T>,
) -> Result<ForkChoiceVerifiedAttestation<T>, Error> {
chain.apply_attestation_to_fork_choice(self)
}
/// Returns the underlying `attestation` for the `signed_aggregate`.
pub fn attestation(&self) -> &Attestation<T::EthSpec> {
&self.signed_aggregate.message.aggregate
}
}
impl<T: BeaconChainTypes> VerifiedUnaggregatedAttestation<T> {
/// Returns `Ok(Self)` if the `attestation` is valid to be (re)published on the gossip
/// network.
pub fn verify(
attestation: Attestation<T::EthSpec>,
chain: &BeaconChain<T>,
) -> Result<Self, Error> {
// Ensure attestation is within the last ATTESTATION_PROPAGATION_SLOT_RANGE slots (within a
// MAXIMUM_GOSSIP_CLOCK_DISPARITY allowance).
//
// We do not queue future attestations for later processing.
verify_propagation_slot_range(chain, &attestation)?;
// Check to ensure that the attestation is "unaggregated". I.e., it has exactly one
// aggregation bit set.
let num_aggreagtion_bits = attestation.aggregation_bits.num_set_bits();
if num_aggreagtion_bits != 1 {
return Err(Error::NotExactlyOneAggregationBitSet(num_aggreagtion_bits));
}
// Attestations must be for a known block. If the block is unknown, we simply drop the
// attestation and do not delay consideration for later.
verify_head_block_is_known(chain, &attestation)?;
let indexed_attestation = obtain_indexed_attestation(chain, &attestation)?;
let validator_index = *indexed_attestation
.attesting_indices
.first()
.ok_or_else(|| Error::NotExactlyOneAggregationBitSet(0))?;
/*
* The attestation is the first valid attestation received for the participating validator
* for the slot, attestation.data.slot.
*/
if chain
.observed_attesters
.validator_has_been_observed(&attestation, validator_index as usize)
.map_err(|e| BeaconChainError::from(e))?
{
return Err(Error::PriorAttestationKnown {
validator_index,
epoch: attestation.data.target.epoch,
});
}
// The aggregate signature of the attestation is valid.
verify_attestation_signature(chain, &indexed_attestation)?;
// Now that the attestation has been fully verified, store that we have received a valid
// attestation from this validator.
//
// It's important to double check that the attestation still hasn't been observed, since
// there can be a race-condition if we receive two attestations at the same time and
// process them in different threads.
if chain
.observed_attesters
.observe_validator(&attestation, validator_index as usize)
.map_err(|e| BeaconChainError::from(e))?
{
return Err(Error::PriorAttestationKnown {
validator_index,
epoch: attestation.data.target.epoch,
});
}
Ok(Self {
attestation,
indexed_attestation,
})
}
/// A helper function to add this attestation to `beacon_chain.naive_aggregation_pool`.
pub fn add_to_pool(self, chain: &BeaconChain<T>) -> Result<Self, Error> {
chain.add_to_naive_aggregation_pool(self)
}
/// Returns the wrapped `attestation`.
pub fn attestation(&self) -> &Attestation<T::EthSpec> {
&self.attestation
}
/// Returns a mutable reference to the underlying attestation.
///
/// Only use during testing since modifying the `IndexedAttestation` can cause the attestation
/// to no-longer be valid.
pub fn __indexed_attestation_mut(&mut self) -> &mut IndexedAttestation<T::EthSpec> {
&mut self.indexed_attestation
}
}
impl<'a, T: BeaconChainTypes> ForkChoiceVerifiedAttestation<'a, T> {
/// Returns `Ok(Self)` if the `attestation` is valid to be applied to the beacon chain fork
/// choice.
///
/// The supplied `indexed_attestation` MUST have a valid signature, this function WILL NOT
/// CHECK THE SIGNATURE. Use the `VerifiedAggregatedAttestation` or
/// `VerifiedUnaggregatedAttestation` structs to do signature verification.
fn from_signature_verified_components(
indexed_attestation: &'a IndexedAttestation<T::EthSpec>,
chain: &BeaconChain<T>,
) -> Result<Self, Error> {
// There is no point in processing an attestation with an empty bitfield. Reject
// it immediately.
//
// This is not in the specification, however it should be transparent to other nodes. We
// return early here to avoid wasting precious resources verifying the rest of it.
if indexed_attestation.attesting_indices.len() == 0 {
return Err(Error::EmptyAggregationBitfield);
}
let slot_now = chain.slot()?;
let epoch_now = slot_now.epoch(T::EthSpec::slots_per_epoch());
let target = indexed_attestation.data.target.clone();
// Attestation must be from the current or previous epoch.
if target.epoch > epoch_now {
return Err(Error::FutureEpoch {
attestation_epoch: target.epoch,
current_epoch: epoch_now,
});
} else if target.epoch + 1 < epoch_now {
return Err(Error::PastEpoch {
attestation_epoch: target.epoch,
current_epoch: epoch_now,
});
}
if target.epoch
!= indexed_attestation
.data
.slot
.epoch(T::EthSpec::slots_per_epoch())
{
return Err(Error::BadTargetEpoch);
}
// Attestation target must be for a known block.
if !chain.fork_choice.contains_block(&target.root) {
return Err(Error::UnknownTargetRoot(target.root));
}
// TODO: we're not testing an assert from the spec:
//
// `assert get_current_slot(store) >= compute_start_slot_at_epoch(target.epoch)`
//
// I think this check is redundant and I've raised an issue here:
//
// https://github.com/ethereum/eth2.0-specs/pull/1755
//
// To resolve this todo, observe the outcome of the above PR.
// Load the slot and state root for `attestation.data.beacon_block_root`.
//
// This indirectly checks to see if the `attestation.data.beacon_block_root` is in our fork
// choice. Any known, non-finalized block should be in fork choice, so this check
// immediately filters out attestations that attest to a block that has not been processed.
//
// Attestations must be for a known block. If the block is unknown, we simply drop the
// attestation and do not delay consideration for later.
let (block_slot, _state_root) = chain
.fork_choice
.block_slot_and_state_root(&indexed_attestation.data.beacon_block_root)
.ok_or_else(|| Error::UnknownHeadBlock {
beacon_block_root: indexed_attestation.data.beacon_block_root,
})?;
// TODO: currently we do not check the FFG source/target. This is what the spec dictates
// but it seems wrong.
//
// I have opened an issue on the specs repo for this:
//
// https://github.com/ethereum/eth2.0-specs/issues/1636
//
// We should revisit this code once that issue has been resolved.
// Attestations must not be for blocks in the future. If this is the case, the attestation
// should not be considered.
if block_slot > indexed_attestation.data.slot {
return Err(Error::AttestsToFutureBlock {
block: block_slot,
attestation: indexed_attestation.data.slot,
});
}
// Note: we're not checking the "attestations can only affect the fork choice of subsequent
// slots" part of the spec, we do this upstream.
Ok(Self {
indexed_attestation,
})
}
/// Returns the wrapped `IndexedAttestation`.
pub fn indexed_attestation(&self) -> &IndexedAttestation<T::EthSpec> {
&self.indexed_attestation
}
}
/// Returns `Ok(())` if the `attestation.data.beacon_block_root` is known to this chain.
///
/// The block root may not be known for two reasons:
///
/// 1. The block has never been verified by our application.
/// 2. The block is prior to the latest finalized block.
///
/// Case (1) is the exact thing we're trying to detect. However case (2) is a little different, but
/// it's still fine to reject here because there's no need for us to handle attestations that are
/// already finalized.
fn verify_head_block_is_known<T: BeaconChainTypes>(
chain: &BeaconChain<T>,
attestation: &Attestation<T::EthSpec>,
) -> Result<(), Error> {
if chain
.fork_choice
.contains_block(&attestation.data.beacon_block_root)
{
Ok(())
} else {
Err(Error::UnknownHeadBlock {
beacon_block_root: attestation.data.beacon_block_root,
})
}
}
/// Verify that the `attestation` is within the acceptable gossip propagation range, with reference
/// to the current slot of the `chain`.
///
/// Accounts for `MAXIMUM_GOSSIP_CLOCK_DISPARITY`.
pub fn verify_propagation_slot_range<T: BeaconChainTypes>(
chain: &BeaconChain<T>,
attestation: &Attestation<T::EthSpec>,
) -> Result<(), Error> {
let attestation_slot = attestation.data.slot;
let latest_permissible_slot = chain
.slot_clock
.now_with_future_tolerance(MAXIMUM_GOSSIP_CLOCK_DISPARITY)
.ok_or_else(|| BeaconChainError::UnableToReadSlot)?;
if attestation_slot > latest_permissible_slot {
return Err(Error::FutureSlot {
attestation_slot,
latest_permissible_slot,
});
}
// Taking advantage of saturating subtraction on `Slot`.
let earliest_permissible_slot = chain
.slot_clock
.now_with_past_tolerance(MAXIMUM_GOSSIP_CLOCK_DISPARITY)
.ok_or_else(|| BeaconChainError::UnableToReadSlot)?
- T::EthSpec::slots_per_epoch();
if attestation_slot < earliest_permissible_slot {
return Err(Error::PastSlot {
attestation_slot,
earliest_permissible_slot,
});
}
Ok(())
}
/// Verifies that the signature of the `indexed_attestation` is valid.
pub fn verify_attestation_signature<T: BeaconChainTypes>(
chain: &BeaconChain<T>,
indexed_attestation: &IndexedAttestation<T::EthSpec>,
) -> Result<(), Error> {
let signature_setup_timer =
metrics::start_timer(&metrics::ATTESTATION_PROCESSING_SIGNATURE_SETUP_TIMES);
let pubkey_cache = chain
.validator_pubkey_cache
.try_read_for(VALIDATOR_PUBKEY_CACHE_LOCK_TIMEOUT)
.ok_or_else(|| BeaconChainError::ValidatorPubkeyCacheLockTimeout)?;
let fork = chain
.canonical_head
.try_read_for(HEAD_LOCK_TIMEOUT)
.ok_or_else(|| BeaconChainError::CanonicalHeadLockTimeout)
.map(|head| head.beacon_state.fork.clone())?;
let signature_set = indexed_attestation_signature_set_from_pubkeys(
|validator_index| pubkey_cache.get(validator_index).map(Cow::Borrowed),
&indexed_attestation.signature,
&indexed_attestation,
&fork,
chain.genesis_validators_root,
&chain.spec,
)
.map_err(BeaconChainError::SignatureSetError)?;
metrics::stop_timer(signature_setup_timer);
let _signature_verification_timer =
metrics::start_timer(&metrics::ATTESTATION_PROCESSING_SIGNATURE_TIMES);
if signature_set.is_valid() {
Ok(())
} else {
Err(Error::InvalidSignature)
}
}
/// Verifies all the signatures in a `SignedAggregateAndProof` using BLS batch verification. This
/// includes three signatures:
///
/// - `signed_aggregate.signature`
/// - `signed_aggregate.signature.message.selection proof`
/// - `signed_aggregate.signature.message.aggregate.signature`
///
/// # Returns
///
/// - `Ok(true)`: if all signatures are valid.
/// - `Ok(false)`: if one or more signatures are invalid.
/// - `Err(e)`: if there was an error preventing signature verification.
pub fn verify_signed_aggregate_signatures<T: BeaconChainTypes>(
chain: &BeaconChain<T>,
signed_aggregate: &SignedAggregateAndProof<T::EthSpec>,
indexed_attestation: &IndexedAttestation<T::EthSpec>,
) -> Result<bool, Error> {
let pubkey_cache = chain
.validator_pubkey_cache
.try_read_for(VALIDATOR_PUBKEY_CACHE_LOCK_TIMEOUT)
.ok_or_else(|| BeaconChainError::ValidatorPubkeyCacheLockTimeout)?;
let aggregator_index = signed_aggregate.message.aggregator_index;
if aggregator_index >= pubkey_cache.len() as u64 {
return Err(Error::AggregatorPubkeyUnknown(aggregator_index));
}
let fork = chain
.canonical_head
.try_read_for(HEAD_LOCK_TIMEOUT)
.ok_or_else(|| BeaconChainError::CanonicalHeadLockTimeout)
.map(|head| head.beacon_state.fork.clone())?;
let signature_sets = vec![
signed_aggregate_selection_proof_signature_set(
|validator_index| pubkey_cache.get(validator_index).map(Cow::Borrowed),
&signed_aggregate,
&fork,
chain.genesis_validators_root,
&chain.spec,
)
.map_err(BeaconChainError::SignatureSetError)?,
signed_aggregate_signature_set(
|validator_index| pubkey_cache.get(validator_index).map(Cow::Borrowed),
&signed_aggregate,
&fork,
chain.genesis_validators_root,
&chain.spec,
)
.map_err(BeaconChainError::SignatureSetError)?,
indexed_attestation_signature_set_from_pubkeys(
|validator_index| pubkey_cache.get(validator_index).map(Cow::Borrowed),
&indexed_attestation.signature,
&indexed_attestation,
&fork,
chain.genesis_validators_root,
&chain.spec,
)
.map_err(BeaconChainError::SignatureSetError)?,
];
Ok(verify_signature_sets(signature_sets))
}
/// Returns the `indexed_attestation` for the `attestation` using the public keys cached in the
/// `chain`.
pub fn obtain_indexed_attestation<T: BeaconChainTypes>(
chain: &BeaconChain<T>,
attestation: &Attestation<T::EthSpec>,
) -> Result<IndexedAttestation<T::EthSpec>, Error> {
map_attestation_committee(chain, attestation, |committee| {
get_indexed_attestation(committee.committee, &attestation)
.map_err(|e| BeaconChainError::from(e).into())
})
}
/// Runs the `map_fn` with the committee for the given `attestation`.
///
/// This function exists in this odd "map" pattern because efficiently obtaining the committee for
/// an attestation can be complex. It might involve reading straight from the
/// `beacon_chain.shuffling_cache` or it might involve reading it from a state from the DB. Due to
/// the complexities of `RwLock`s on the shuffling cache, a simple `Cow` isn't suitable here.
///
/// If the committee for `attestation` isn't found in the `shuffling_cache`, we will read a state
/// from disk and then update the `shuffling_cache`.
pub fn map_attestation_committee<'a, T, F, R>(
chain: &'a BeaconChain<T>,
attestation: &Attestation<T::EthSpec>,
map_fn: F,
) -> Result<R, Error>
where
T: BeaconChainTypes,
F: Fn(BeaconCommittee) -> Result<R, Error>,
{
let attestation_epoch = attestation.data.slot.epoch(T::EthSpec::slots_per_epoch());
let target = &attestation.data.target;
// Attestation target must be for a known block.
//
// We use fork choice to find the target root, which means that we reject any attestation
// that has a `target.root` earlier than our latest finalized root. There's no point in
// processing an attestation that does not include our latest finalized block in its chain.
//
// We do not delay consideration for later, we simply drop the attestation.
let (target_block_slot, target_block_state_root) = chain
.fork_choice
.block_slot_and_state_root(&target.root)
.ok_or_else(|| Error::UnknownTargetRoot(target.root))?;
// Obtain the shuffling cache, timing how long we wait.
let cache_wait_timer =
metrics::start_timer(&metrics::ATTESTATION_PROCESSING_SHUFFLING_CACHE_WAIT_TIMES);
let mut shuffling_cache = chain
.shuffling_cache
.try_write_for(ATTESTATION_CACHE_LOCK_TIMEOUT)
.ok_or_else(|| BeaconChainError::AttestationCacheLockTimeout)?;
metrics::stop_timer(cache_wait_timer);
if let Some(committee_cache) = shuffling_cache.get(attestation_epoch, target.root) {
committee_cache
.get_beacon_committee(attestation.data.slot, attestation.data.index)
.map(map_fn)
.unwrap_or_else(|| {
Err(Error::NoCommitteeForSlotAndIndex {
slot: attestation.data.slot,
index: attestation.data.index,
})
})
} else {
// Drop the shuffling cache to avoid holding the lock for any longer than
// required.
drop(shuffling_cache);
debug!(
chain.log,
"Attestation processing cache miss";
"attn_epoch" => attestation_epoch.as_u64(),
"target_block_epoch" => target_block_slot.epoch(T::EthSpec::slots_per_epoch()).as_u64(),
);
let state_read_timer =
metrics::start_timer(&metrics::ATTESTATION_PROCESSING_STATE_READ_TIMES);
let mut state = chain
.get_state(&target_block_state_root, Some(target_block_slot))?
.ok_or_else(|| BeaconChainError::MissingBeaconState(target_block_state_root))?;
metrics::stop_timer(state_read_timer);
let state_skip_timer =
metrics::start_timer(&metrics::ATTESTATION_PROCESSING_STATE_SKIP_TIMES);
while state.current_epoch() + 1 < attestation_epoch {
// Here we tell `per_slot_processing` to skip hashing the state and just
// use the zero hash instead.
//
// The state roots are not useful for the shuffling, so there's no need to
// compute them.
per_slot_processing(&mut state, Some(Hash256::zero()), &chain.spec)
.map_err(|e| BeaconChainError::from(e))?
}
metrics::stop_timer(state_skip_timer);
let committee_building_timer =
metrics::start_timer(&metrics::ATTESTATION_PROCESSING_COMMITTEE_BUILDING_TIMES);
let relative_epoch = RelativeEpoch::from_epoch(state.current_epoch(), attestation_epoch)
.map_err(BeaconChainError::IncorrectStateForAttestation)?;
state
.build_committee_cache(relative_epoch, &chain.spec)
.map_err(|e| BeaconChainError::from(e))?;
let committee_cache = state
.committee_cache(relative_epoch)
.map_err(|e| BeaconChainError::from(e))?;
chain
.shuffling_cache
.try_write_for(ATTESTATION_CACHE_LOCK_TIMEOUT)
.ok_or_else(|| BeaconChainError::AttestationCacheLockTimeout)?
.insert(attestation_epoch, target.root, committee_cache);
metrics::stop_timer(committee_building_timer);
committee_cache
.get_beacon_committee(attestation.data.slot, attestation.data.index)
.map(map_fn)
.unwrap_or_else(|| {
Err(Error::NoCommitteeForSlotAndIndex {
slot: attestation.data.slot,
index: attestation.data.index,
})
})
}
}

View File

@@ -1,3 +1,7 @@
use crate::attestation_verification::{
Error as AttestationError, ForkChoiceVerifiedAttestation, IntoForkChoiceVerifiedAttestation,
VerifiedAggregatedAttestation, VerifiedUnaggregatedAttestation,
};
use crate::block_verification::{
check_block_relevancy, get_block_root, signature_verify_chain_segment, BlockError,
FullyVerifiedBlock, GossipVerifiedBlock, IntoFullyVerifiedBlock,
@@ -10,6 +14,9 @@ use crate::head_tracker::HeadTracker;
use crate::metrics;
use crate::migrate::Migrate;
use crate::naive_aggregation_pool::{Error as NaiveAggregationError, NaiveAggregationPool};
use crate::observed_attestations::{Error as AttestationObservationError, ObservedAttestations};
use crate::observed_attesters::{ObservedAggregators, ObservedAttesters};
use crate::observed_block_producers::ObservedBlockProducers;
use crate::persisted_beacon_chain::PersistedBeaconChain;
use crate::shuffling_cache::ShufflingCache;
use crate::snapshot_cache::SnapshotCache;
@@ -23,10 +30,7 @@ use state_processing::per_block_processing::errors::{
AttestationValidationError, AttesterSlashingValidationError, ExitValidationError,
ProposerSlashingValidationError,
};
use state_processing::{
common::get_indexed_attestation, per_block_processing, per_slot_processing,
signature_sets::indexed_attestation_signature_set_from_pubkeys, BlockSignatureStrategy,
};
use state_processing::{per_block_processing, per_slot_processing, BlockSignatureStrategy};
use std::borrow::Cow;
use std::cmp::Ordering;
use std::collections::HashMap;
@@ -49,14 +53,14 @@ pub const GRAFFITI: &str = "sigp/lighthouse-0.2.0-prerelease";
/// The time-out before failure during an operation to take a read/write RwLock on the canonical
/// head.
const HEAD_LOCK_TIMEOUT: Duration = Duration::from_secs(1);
pub const HEAD_LOCK_TIMEOUT: Duration = Duration::from_secs(1);
/// The time-out before failure during an operation to take a read/write RwLock on the block
/// processing cache.
pub const BLOCK_PROCESSING_CACHE_LOCK_TIMEOUT: Duration = Duration::from_secs(1);
/// The time-out before failure during an operation to take a read/write RwLock on the
/// attestation cache.
const ATTESTATION_CACHE_LOCK_TIMEOUT: Duration = Duration::from_secs(1);
pub const ATTESTATION_CACHE_LOCK_TIMEOUT: Duration = Duration::from_secs(1);
/// The time-out before failure during an operation to take a read/write RwLock on the
/// validator pubkey cache.
@@ -67,23 +71,6 @@ pub const OP_POOL_DB_KEY: [u8; 32] = [0; 32];
pub const ETH1_CACHE_DB_KEY: [u8; 32] = [0; 32];
pub const FORK_CHOICE_DB_KEY: [u8; 32] = [0; 32];
#[derive(Debug, PartialEq)]
pub enum AttestationType {
/// An attestation with a single-signature that has been published in accordance with the naive
/// aggregation strategy.
///
/// These attestations may have come from a `committee_index{subnet_id}_beacon_attestation`
/// gossip subnet or they have have come directly from a validator attached to our API.
///
/// If `should_store == true`, the attestation will be added to the `NaiveAggregationPool`.
Unaggregated { should_store: bool },
/// An attestation with one more more signatures that has passed through the aggregation phase
/// of the naive aggregation scheme.
///
/// These attestations must have come from the `beacon_aggregate_and_proof` gossip subnet.
Aggregated,
}
/// The result of a chain segment processing.
#[derive(Debug)]
pub enum ChainSegmentResult {
@@ -190,6 +177,15 @@ pub struct BeaconChain<T: BeaconChainTypes> {
/// This pool accepts `Attestation` objects that only have one aggregation bit set and provides
/// a method to get an aggregated `Attestation` for some `AttestationData`.
pub naive_aggregation_pool: NaiveAggregationPool<T::EthSpec>,
/// Contains a store of attestations which have been observed by the beacon chain.
pub observed_attestations: ObservedAttestations<T::EthSpec>,
/// Maintains a record of which validators have been seen to attest in recent epochs.
pub observed_attesters: ObservedAttesters<T::EthSpec>,
/// Maintains a record of which validators have been seen to create `SignedAggregateAndProofs`
/// in recent epochs.
pub observed_aggregators: ObservedAggregators<T::EthSpec>,
/// Maintains a record of which validators have proposed blocks for each slot.
pub observed_block_producers: ObservedBlockProducers<T::EthSpec>,
/// Provides information from the Ethereum 1 (PoW) chain.
pub eth1_chain: Option<Eth1Chain<T::Eth1Chain, T::EthSpec, T::Store>>,
/// Stores a "snapshot" of the chain at the time the head-of-the-chain block was received.
@@ -742,10 +738,13 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
self.naive_aggregation_pool.get(data).map_err(Into::into)
}
/// Produce a raw unsigned `Attestation` that is valid for the given `slot` and `index`.
/// Produce an unaggregated `Attestation` that is valid for the given `slot` and `index`.
///
/// The produced `Attestation` will not be valid until it has been signed by exactly one
/// validator that is in the committee for `slot` and `index` in the canonical chain.
///
/// Always attests to the canonical chain.
pub fn produce_attestation(
pub fn produce_unaggregated_attestation(
&self,
slot: Slot,
index: CommitteeIndex,
@@ -758,7 +757,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.ok_or_else(|| Error::CanonicalHeadLockTimeout)?;
if slot >= head.beacon_block.slot() {
self.produce_attestation_for_block(
self.produce_unaggregated_attestation_for_block(
slot,
index,
head.beacon_block_root,
@@ -790,15 +789,24 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
state.build_committee_cache(RelativeEpoch::Current, &self.spec)?;
self.produce_attestation_for_block(slot, index, beacon_block_root, Cow::Owned(state))
self.produce_unaggregated_attestation_for_block(
slot,
index,
beacon_block_root,
Cow::Owned(state),
)
}
}
/// Produce an `AttestationData` that attests to the chain denoted by `block_root` and `state`.
/// Produces an "unaggregated" attestation for the given `slot` and `index` that attests to
/// `beacon_block_root`. The provided `state` should match the `block.state_root` for the
/// `block` identified by `beacon_block_root`.
///
/// Permits attesting to any arbitrary chain. Generally, the `produce_attestation_data`
/// function should be used as it attests to the canonical chain.
pub fn produce_attestation_for_block(
/// The attestation doesn't _really_ have anything about it that makes it unaggregated per say,
/// however this function is only required in the context of forming an unaggregated
/// attestation. It would be an (undetectable) violation of the protocol to create a
/// `SignedAggregateAndProof` based upon the output of this function.
pub fn produce_unaggregated_attestation_for_block(
&self,
slot: Slot,
index: CommitteeIndex,
@@ -846,393 +854,125 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
})
}
/// Accept a new, potentially invalid attestation from the network.
/// Accepts some `Attestation` from the network and attempts to verify it, returning `Ok(_)` if
/// it is valid to be (re)broadcast on the gossip network.
///
/// If valid, the attestation is added to `self.op_pool` and `self.fork_choice`.
///
/// Returns an `Ok(AttestationProcessingOutcome)` if the chain was able to make a determination
/// about the `attestation` (whether it was invalid or not). Returns an `Err` if there was an
/// error during this process and no determination was able to be made.
///
/// ## Notes
///
/// - Whilst the `attestation` is added to fork choice, the head is not updated. That must be
/// done separately.
///
/// The `store_raw` parameter determines if this attestation is to be stored in the operation
/// pool. `None` indicates the attestation is not stored in the operation pool (we don't have a
/// validator required to aggregate these attestations). `Some(true)` indicates we are storing a
/// raw un-aggregated attestation from a subnet into the `op_pool` which is short-lived and `Some(false)`
/// indicates that we are storing an aggregate attestation in the `op_pool`.
pub fn process_attestation(
/// The attestation must be "unaggregated", that is it must have exactly one
/// aggregation bit set.
pub fn verify_unaggregated_attestation_for_gossip(
&self,
attestation: Attestation<T::EthSpec>,
attestation_type: AttestationType,
) -> Result<AttestationProcessingOutcome, Error> {
metrics::inc_counter(&metrics::ATTESTATION_PROCESSING_REQUESTS);
let timer = metrics::start_timer(&metrics::ATTESTATION_PROCESSING_TIMES);
let outcome = self.process_attestation_internal(attestation.clone(), attestation_type);
match &outcome {
Ok(outcome) => match outcome {
AttestationProcessingOutcome::Processed => {
metrics::inc_counter(&metrics::ATTESTATION_PROCESSING_SUCCESSES);
trace!(
self.log,
"Beacon attestation imported";
"target_epoch" => attestation.data.target.epoch,
"index" => attestation.data.index,
);
let _ = self
.event_handler
.register(EventKind::BeaconAttestationImported {
attestation: Box::new(attestation),
});
}
other => {
trace!(
self.log,
"Beacon attestation rejected";
"reason" => format!("{:?}", other),
);
let _ = self
.event_handler
.register(EventKind::BeaconAttestationRejected {
reason: format!("Invalid attestation: {:?}", other),
attestation: Box::new(attestation),
});
}
},
Err(e) => {
error!(
self.log,
"Beacon attestation processing error";
"error" => format!("{:?}", e),
);
let _ = self
.event_handler
.register(EventKind::BeaconAttestationRejected {
reason: format!("Internal error: {:?}", e),
attestation: Box::new(attestation),
});
}
}
metrics::stop_timer(timer);
outcome
) -> Result<VerifiedUnaggregatedAttestation<T>, AttestationError> {
VerifiedUnaggregatedAttestation::verify(attestation, self)
}
pub fn process_attestation_internal(
/// Accepts some `SignedAggregateAndProof` from the network and attempts to verify it,
/// returning `Ok(_)` if it is valid to be (re)broadcast on the gossip network.
pub fn verify_aggregated_attestation_for_gossip(
&self,
attestation: Attestation<T::EthSpec>,
attestation_type: AttestationType,
) -> Result<AttestationProcessingOutcome, Error> {
let initial_validation_timer =
metrics::start_timer(&metrics::ATTESTATION_PROCESSING_INITIAL_VALIDATION_TIMES);
signed_aggregate: SignedAggregateAndProof<T::EthSpec>,
) -> Result<VerifiedAggregatedAttestation<T>, AttestationError> {
VerifiedAggregatedAttestation::verify(signed_aggregate, self)
}
// There is no point in processing an attestation with an empty bitfield. Reject
// it immediately.
if attestation.aggregation_bits.num_set_bits() == 0 {
return Ok(AttestationProcessingOutcome::EmptyAggregationBitfield);
}
/// Accepts some attestation-type object and attempts to verify it in the context of fork
/// choice. If it is valid it is applied to `self.fork_choice`.
///
/// Common items that implement `IntoForkChoiceVerifiedAttestation`:
///
/// - `VerifiedUnaggregatedAttestation`
/// - `VerifiedAggregatedAttestation`
/// - `ForkChoiceVerifiedAttestation`
pub fn apply_attestation_to_fork_choice<'a>(
&self,
unverified_attestation: &'a impl IntoForkChoiceVerifiedAttestation<'a, T>,
) -> Result<ForkChoiceVerifiedAttestation<'a, T>, AttestationError> {
let verified = unverified_attestation.into_fork_choice_verified_attestation(self)?;
let indexed_attestation = verified.indexed_attestation();
self.fork_choice
.process_indexed_attestation(indexed_attestation)
.map_err(|e| Error::from(e))?;
Ok(verified)
}
let attestation_epoch = attestation.data.slot.epoch(T::EthSpec::slots_per_epoch());
let epoch_now = self.epoch()?;
let target = attestation.data.target.clone();
/// Accepts an `VerifiedUnaggregatedAttestation` and attempts to apply it to the "naive
/// aggregation pool".
///
/// The naive aggregation pool is used by local validators to produce
/// `SignedAggregateAndProof`.
///
/// If the attestation is too old (low slot) to be included in the pool it is simply dropped
/// and no error is returned.
pub fn add_to_naive_aggregation_pool(
&self,
unaggregated_attestation: VerifiedUnaggregatedAttestation<T>,
) -> Result<VerifiedUnaggregatedAttestation<T>, AttestationError> {
let attestation = unaggregated_attestation.attestation();
// Attestation must be from the current or previous epoch.
if attestation_epoch > epoch_now {
return Ok(AttestationProcessingOutcome::FutureEpoch {
attestation_epoch,
current_epoch: epoch_now,
});
} else if attestation_epoch + 1 < epoch_now {
return Ok(AttestationProcessingOutcome::PastEpoch {
attestation_epoch,
current_epoch: epoch_now,
});
}
if target.epoch != attestation.data.slot.epoch(T::EthSpec::slots_per_epoch()) {
return Ok(AttestationProcessingOutcome::BadTargetEpoch);
}
// Attestation target must be for a known block.
//
// We use fork choice to find the target root, which means that we reject any attestation
// that has a `target.root` earlier than our latest finalized root. There's no point in
// processing an attestation that does not include our latest finalized block in its chain.
//
// We do not delay consideration for later, we simply drop the attestation.
let (target_block_slot, target_block_state_root) = if let Some((slot, state_root)) =
self.fork_choice.block_slot_and_state_root(&target.root)
{
(slot, state_root)
} else {
return Ok(AttestationProcessingOutcome::UnknownTargetRoot(target.root));
};
// Load the slot and state root for `attestation.data.beacon_block_root`.
//
// This indirectly checks to see if the `attestation.data.beacon_block_root` is in our fork
// choice. Any known, non-finalized block should be in fork choice, so this check
// immediately filters out attestations that attest to a block that has not been processed.
//
// Attestations must be for a known block. If the block is unknown, we simply drop the
// attestation and do not delay consideration for later.
let block_slot = if let Some((slot, _state_root)) = self
.fork_choice
.block_slot_and_state_root(&attestation.data.beacon_block_root)
{
slot
} else {
return Ok(AttestationProcessingOutcome::UnknownHeadBlock {
beacon_block_root: attestation.data.beacon_block_root,
});
};
// TODO: currently we do not check the FFG source/target. This is what the spec dictates
// but it seems wrong.
//
// I have opened an issue on the specs repo for this:
//
// https://github.com/ethereum/eth2.0-specs/issues/1636
//
// We should revisit this code once that issue has been resolved.
// Attestations must not be for blocks in the future. If this is the case, the attestation
// should not be considered.
if block_slot > attestation.data.slot {
return Ok(AttestationProcessingOutcome::AttestsToFutureBlock {
block: block_slot,
attestation: attestation.data.slot,
});
}
metrics::stop_timer(initial_validation_timer);
let cache_wait_timer =
metrics::start_timer(&metrics::ATTESTATION_PROCESSING_SHUFFLING_CACHE_WAIT_TIMES);
let mut shuffling_cache = self
.shuffling_cache
.try_write_for(ATTESTATION_CACHE_LOCK_TIMEOUT)
.ok_or_else(|| Error::AttestationCacheLockTimeout)?;
metrics::stop_timer(cache_wait_timer);
let indexed_attestation =
if let Some(committee_cache) = shuffling_cache.get(attestation_epoch, target.root) {
if let Some(committee) = committee_cache
.get_beacon_committee(attestation.data.slot, attestation.data.index)
{
let indexed_attestation =
get_indexed_attestation(committee.committee, &attestation)?;
// Drop the shuffling cache to avoid holding the lock for any longer than
// required.
drop(shuffling_cache);
indexed_attestation
} else {
return Ok(AttestationProcessingOutcome::NoCommitteeForSlotAndIndex {
slot: attestation.data.slot,
index: attestation.data.index,
});
}
} else {
// Drop the shuffling cache to avoid holding the lock for any longer than
// required.
drop(shuffling_cache);
debug!(
match self.naive_aggregation_pool.insert(attestation) {
Ok(outcome) => trace!(
self.log,
"Stored unaggregated attestation";
"outcome" => format!("{:?}", outcome),
"index" => attestation.data.index,
"slot" => attestation.data.slot.as_u64(),
),
Err(NaiveAggregationError::SlotTooLow {
slot,
lowest_permissible_slot,
}) => {
trace!(
self.log,
"Attestation processing cache miss";
"attn_epoch" => attestation_epoch.as_u64(),
"head_block_epoch" => block_slot.epoch(T::EthSpec::slots_per_epoch()).as_u64(),
"Refused to store unaggregated attestation";
"lowest_permissible_slot" => lowest_permissible_slot.as_u64(),
"slot" => slot.as_u64(),
);
let state_read_timer =
metrics::start_timer(&metrics::ATTESTATION_PROCESSING_STATE_READ_TIMES);
let mut state = self
.get_state(&target_block_state_root, Some(target_block_slot))?
.ok_or_else(|| Error::MissingBeaconState(target_block_state_root))?;
metrics::stop_timer(state_read_timer);
let state_skip_timer =
metrics::start_timer(&metrics::ATTESTATION_PROCESSING_STATE_SKIP_TIMES);
while state.current_epoch() + 1 < attestation_epoch {
// Here we tell `per_slot_processing` to skip hashing the state and just
// use the zero hash instead.
//
// The state roots are not useful for the shuffling, so there's no need to
// compute them.
per_slot_processing(&mut state, Some(Hash256::zero()), &self.spec)?
}
metrics::stop_timer(state_skip_timer);
let committee_building_timer =
metrics::start_timer(&metrics::ATTESTATION_PROCESSING_COMMITTEE_BUILDING_TIMES);
let relative_epoch =
RelativeEpoch::from_epoch(state.current_epoch(), attestation_epoch)
.map_err(Error::IncorrectStateForAttestation)?;
state.build_committee_cache(relative_epoch, &self.spec)?;
let committee_cache = state.committee_cache(relative_epoch)?;
self.shuffling_cache
.try_write_for(ATTESTATION_CACHE_LOCK_TIMEOUT)
.ok_or_else(|| Error::AttestationCacheLockTimeout)?
.insert(attestation_epoch, target.root, committee_cache);
metrics::stop_timer(committee_building_timer);
if let Some(committee) = committee_cache
.get_beacon_committee(attestation.data.slot, attestation.data.index)
{
get_indexed_attestation(committee.committee, &attestation)?
} else {
return Ok(AttestationProcessingOutcome::NoCommitteeForSlotAndIndex {
slot: attestation.data.slot,
index: attestation.data.index,
});
}
};
let signature_setup_timer =
metrics::start_timer(&metrics::ATTESTATION_PROCESSING_SIGNATURE_SETUP_TIMES);
let pubkey_cache = self
.validator_pubkey_cache
.try_read_for(VALIDATOR_PUBKEY_CACHE_LOCK_TIMEOUT)
.ok_or_else(|| Error::ValidatorPubkeyCacheLockTimeout)?;
let (fork, genesis_validators_root) = self
.canonical_head
.try_read_for(HEAD_LOCK_TIMEOUT)
.ok_or_else(|| Error::CanonicalHeadLockTimeout)
.map(|head| {
(
head.beacon_state.fork.clone(),
head.beacon_state.genesis_validators_root,
)
})?;
let signature_set = indexed_attestation_signature_set_from_pubkeys(
|validator_index| pubkey_cache.get(validator_index).map(Cow::Borrowed),
&attestation.signature,
&indexed_attestation,
&fork,
genesis_validators_root,
&self.spec,
)
.map_err(Error::SignatureSetError)?;
metrics::stop_timer(signature_setup_timer);
let signature_verification_timer =
metrics::start_timer(&metrics::ATTESTATION_PROCESSING_SIGNATURE_TIMES);
let signature_is_valid = signature_set.is_valid();
metrics::stop_timer(signature_verification_timer);
drop(pubkey_cache);
if signature_is_valid {
// Provide the attestation to fork choice, updating the validator latest messages but
// _without_ finding and updating the head.
if let Err(e) = self
.fork_choice
.process_indexed_attestation(&indexed_attestation)
{
error!(
self.log,
"Add attestation to fork choice failed";
"beacon_block_root" => format!("{}", attestation.data.beacon_block_root),
"error" => format!("{:?}", e)
);
return Err(e.into());
}
// Provide the valid attestation to op pool, which may choose to retain the
// attestation for inclusion in a future block. If we receive an attestation from a
// subnet without a validator responsible for aggregating it, we don't store it in the
// op pool.
if self.eth1_chain.is_some() {
match attestation_type {
AttestationType::Unaggregated { should_store } if should_store => {
match self.naive_aggregation_pool.insert(&attestation) {
Ok(outcome) => trace!(
self.log,
"Stored unaggregated attestation";
"outcome" => format!("{:?}", outcome),
"index" => attestation.data.index,
"slot" => attestation.data.slot.as_u64(),
),
Err(NaiveAggregationError::SlotTooLow {
slot,
lowest_permissible_slot,
}) => {
trace!(
self.log,
"Refused to store unaggregated attestation";
"lowest_permissible_slot" => lowest_permissible_slot.as_u64(),
"slot" => slot.as_u64(),
);
}
Err(e) => error!(
self.log,
"Failed to store unaggregated attestation";
"error" => format!("{:?}", e),
"index" => attestation.data.index,
"slot" => attestation.data.slot.as_u64(),
),
}
}
AttestationType::Unaggregated { .. } => trace!(
Err(e) => {
error!(
self.log,
"Did not store unaggregated attestation";
"Failed to store unaggregated attestation";
"error" => format!("{:?}", e),
"index" => attestation.data.index,
"slot" => attestation.data.slot.as_u64(),
),
AttestationType::Aggregated => {
let index = attestation.data.index;
let slot = attestation.data.slot;
match self.op_pool.insert_attestation(
attestation,
&fork,
genesis_validators_root,
&self.spec,
) {
Ok(_) => {}
Err(e) => {
error!(
self.log,
"Failed to add attestation to op pool";
"error" => format!("{:?}", e),
"index" => index,
"slot" => slot.as_u64(),
);
}
}
}
}
);
return Err(Error::from(e).into());
}
};
// Update the metrics.
metrics::inc_counter(&metrics::ATTESTATION_PROCESSING_SUCCESSES);
Ok(unaggregated_attestation)
}
Ok(AttestationProcessingOutcome::Processed)
} else {
Ok(AttestationProcessingOutcome::InvalidSignature)
/// Accepts a `VerifiedAggregatedAttestation` and attempts to apply it to `self.op_pool`.
///
/// The op pool is used by local block producers to pack blocks with operations.
pub fn add_to_block_inclusion_pool(
&self,
signed_aggregate: VerifiedAggregatedAttestation<T>,
) -> Result<VerifiedAggregatedAttestation<T>, AttestationError> {
// If there's no eth1 chain then it's impossible to produce blocks and therefore
// useless to put things in the op pool.
if self.eth1_chain.is_some() {
let fork = self
.canonical_head
.try_read_for(HEAD_LOCK_TIMEOUT)
.ok_or_else(|| Error::CanonicalHeadLockTimeout)?
.beacon_state
.fork
.clone();
self.op_pool
.insert_attestation(
// TODO: address this clone.
signed_aggregate.attestation().clone(),
&fork,
self.genesis_validators_root,
&self.spec,
)
.map_err(Error::from)?;
}
Ok(signed_aggregate)
}
/// Check that the shuffling at `block_root` is equal to one of the shufflings of `state`.
@@ -1671,6 +1411,24 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
let parent_block = fully_verified_block.parent_block;
let intermediate_states = fully_verified_block.intermediate_states;
let attestation_observation_timer =
metrics::start_timer(&metrics::BLOCK_PROCESSING_ATTESTATION_OBSERVATION);
// Iterate through the attestations in the block and register them as an "observed
// attestation". This will stop us from propagating them on the gossip network.
for a in &block.body.attestations {
match self.observed_attestations.observe_attestation(a, None) {
// If the observation was successful or if the slot for the attestation was too
// low, continue.
//
// We ignore `SlotTooLow` since this will be very common whilst syncing.
Ok(_) | Err(AttestationObservationError::SlotTooLow { .. }) => {}
Err(e) => return Err(BlockError::BeaconChainError(e.into())),
}
}
metrics::stop_timer(attestation_observation_timer);
let fork_choice_register_timer =
metrics::start_timer(&metrics::BLOCK_PROCESSING_FORK_CHOICE_REGISTER);
@@ -2104,6 +1862,9 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
} else {
self.fork_choice.prune()?;
self.observed_block_producers
.prune(new_finalized_epoch.start_slot(T::EthSpec::slots_per_epoch()));
self.snapshot_cache
.try_write_for(BLOCK_PROCESSING_CACHE_LOCK_TIMEOUT)
.map(|mut snapshot_cache| {

View File

@@ -104,10 +104,18 @@ pub enum BlockError {
},
/// Block is already known, no need to re-import.
BlockIsAlreadyKnown,
/// A block for this proposer and slot has already been observed.
RepeatProposal { proposer: u64, slot: Slot },
/// The block slot exceeds the MAXIMUM_BLOCK_SLOT_NUMBER.
BlockSlotLimitReached,
/// The `BeaconBlock` has a `proposer_index` that does not match the index we computed locally.
///
/// The block is invalid.
IncorrectBlockProposer { block: u64, local_shuffling: u64 },
/// The proposal signature in invalid.
ProposalSignatureInvalid,
/// The `block.proposal_index` is not known.
UnknownValidator(u64),
/// A signature in the block is invalid (exactly which is unknown).
InvalidSignature,
/// The provided block is from an earlier slot than its parent.
@@ -126,7 +134,18 @@ pub enum BlockError {
impl From<BlockSignatureVerifierError> for BlockError {
fn from(e: BlockSignatureVerifierError) -> Self {
BlockError::BeaconChainError(BeaconChainError::BlockSignatureVerifierError(e))
match e {
// Make a special distinction for `IncorrectBlockProposer` since it indicates an
// invalid block, not an internal error.
BlockSignatureVerifierError::IncorrectBlockProposer {
block,
local_shuffling,
} => BlockError::IncorrectBlockProposer {
block,
local_shuffling,
},
e => BlockError::BeaconChainError(BeaconChainError::BlockSignatureVerifierError(e)),
}
}
}
@@ -286,7 +305,17 @@ impl<T: BeaconChainTypes> GossipVerifiedBlock<T> {
// Do not gossip a block from a finalized slot.
check_block_against_finalized_slot(&block.message, chain)?;
// TODO: add check for the `(block.proposer_index, block.slot)` tuple once we have v0.11.0
// Check that we have not already received a block with a valid signature for this slot.
if chain
.observed_block_producers
.proposer_has_been_observed(&block.message)
.map_err(|e| BlockError::BeaconChainError(e.into()))?
{
return Err(BlockError::RepeatProposal {
proposer: block.message.proposer_index,
slot: block.message.slot,
});
}
let mut parent = load_parent(&block.message, chain)?;
let block_root = get_block_root(&block);
@@ -297,21 +326,54 @@ impl<T: BeaconChainTypes> GossipVerifiedBlock<T> {
&chain.spec,
)?;
let pubkey_cache = get_validator_pubkey_cache(chain)?;
let signature_is_valid = {
let pubkey_cache = get_validator_pubkey_cache(chain)?;
let pubkey = pubkey_cache
.get(block.message.proposer_index as usize)
.ok_or_else(|| BlockError::UnknownValidator(block.message.proposer_index))?;
block.verify_signature(
Some(block_root),
pubkey,
&state.fork,
chain.genesis_validators_root,
&chain.spec,
)
};
let mut signature_verifier = get_signature_verifier(&state, &pubkey_cache, &chain.spec);
signature_verifier.include_block_proposal(&block, Some(block_root))?;
if signature_verifier.verify().is_ok() {
Ok(Self {
block,
block_root,
parent,
})
} else {
Err(BlockError::ProposalSignatureInvalid)
if !signature_is_valid {
return Err(BlockError::ProposalSignatureInvalid);
}
// Now the signature is valid, store the proposal so we don't accept another from this
// validator and slot.
//
// It's important to double-check that the proposer still hasn't been observed so we don't
// have a race-condition when verifying two blocks simultaneously.
if chain
.observed_block_producers
.observe_proposer(&block.message)
.map_err(|e| BlockError::BeaconChainError(e.into()))?
{
return Err(BlockError::RepeatProposal {
proposer: block.message.proposer_index,
slot: block.message.slot,
});
}
let expected_proposer =
state.get_beacon_proposer_index(block.message.slot, &chain.spec)? as u64;
if block.message.proposer_index != expected_proposer {
return Err(BlockError::IncorrectBlockProposer {
block: block.message.proposer_index,
local_shuffling: expected_proposer,
});
}
Ok(Self {
block,
block_root,
parent,
})
}
pub fn block_root(&self) -> Hash256 {

View File

@@ -14,6 +14,8 @@ pub enum BlockProcessingOutcome {
InvalidSignature,
/// The proposal signature in invalid.
ProposalSignatureInvalid,
/// The `block.proposal_index` is not known.
UnknownValidator(u64),
/// The parent block was unknown.
ParentUnknown(Hash256),
/// The block slot is greater than the present slot.
@@ -35,6 +37,11 @@ pub enum BlockProcessingOutcome {
},
/// Block is already known, no need to re-import.
BlockIsAlreadyKnown,
/// A block for this proposer and slot has already been observed.
RepeatProposal {
proposer: u64,
slot: Slot,
},
/// The block slot exceeds the MAXIMUM_BLOCK_SLOT_NUMBER.
BlockSlotLimitReached,
/// The provided block is from an earlier slot than its parent.
@@ -42,6 +49,13 @@ pub enum BlockProcessingOutcome {
block_slot: Slot,
state_slot: Slot,
},
/// The `BeaconBlock` has a `proposer_index` that does not match the index we computed locally.
///
/// The block is invalid.
IncorrectBlockProposer {
block: u64,
local_shuffling: u64,
},
/// At least one block in the chain segement did not have it's parent root set to the root of
/// the prior block.
NonLinearParentRoots,
@@ -78,12 +92,16 @@ impl BlockProcessingOutcome {
finalized_slot,
}),
Err(BlockError::BlockIsAlreadyKnown) => Ok(BlockProcessingOutcome::BlockIsAlreadyKnown),
Err(BlockError::RepeatProposal { proposer, slot }) => {
Ok(BlockProcessingOutcome::RepeatProposal { proposer, slot })
}
Err(BlockError::BlockSlotLimitReached) => {
Ok(BlockProcessingOutcome::BlockSlotLimitReached)
}
Err(BlockError::ProposalSignatureInvalid) => {
Ok(BlockProcessingOutcome::ProposalSignatureInvalid)
}
Err(BlockError::UnknownValidator(i)) => Ok(BlockProcessingOutcome::UnknownValidator(i)),
Err(BlockError::InvalidSignature) => Ok(BlockProcessingOutcome::InvalidSignature),
Err(BlockError::BlockIsNotLaterThanParent {
block_slot,
@@ -92,6 +110,13 @@ impl BlockProcessingOutcome {
block_slot,
state_slot,
}),
Err(BlockError::IncorrectBlockProposer {
block,
local_shuffling,
}) => Ok(BlockProcessingOutcome::IncorrectBlockProposer {
block,
local_shuffling,
}),
Err(BlockError::NonLinearParentRoots) => {
Ok(BlockProcessingOutcome::NonLinearParentRoots)
}

View File

@@ -431,6 +431,14 @@ where
.ok_or_else(|| "Cannot build without op pool".to_string())?,
// TODO: allow for persisting and loading the pool from disk.
naive_aggregation_pool: <_>::default(),
// TODO: allow for persisting and loading the pool from disk.
observed_attestations: <_>::default(),
// TODO: allow for persisting and loading the pool from disk.
observed_attesters: <_>::default(),
// TODO: allow for persisting and loading the pool from disk.
observed_aggregators: <_>::default(),
// TODO: allow for persisting and loading the pool from disk.
observed_block_producers: <_>::default(),
eth1_chain: self.eth1_chain,
genesis_validators_root: canonical_head.beacon_state.genesis_validators_root,
canonical_head: TimeoutRwLock::new(canonical_head.clone()),

View File

@@ -1,7 +1,11 @@
use crate::eth1_chain::Error as Eth1ChainError;
use crate::fork_choice::Error as ForkChoiceError;
use crate::naive_aggregation_pool::Error as NaiveAggregationError;
use crate::observed_attestations::Error as ObservedAttestationsError;
use crate::observed_attesters::Error as ObservedAttestersError;
use crate::observed_block_producers::Error as ObservedBlockProducersError;
use operation_pool::OpPoolError;
use safe_arith::ArithError;
use ssz::DecodeError;
use ssz_types::Error as SszTypesError;
use state_processing::{
@@ -66,6 +70,10 @@ pub enum BeaconChainError {
ValidatorPubkeyCacheFileError(String),
OpPoolError(OpPoolError),
NaiveAggregationError(NaiveAggregationError),
ObservedAttestationsError(ObservedAttestationsError),
ObservedAttestersError(ObservedAttestersError),
ObservedBlockProducersError(ObservedBlockProducersError),
ArithError(ArithError),
}
easy_from_to!(SlotProcessingError, BeaconChainError);
@@ -73,7 +81,11 @@ easy_from_to!(AttestationValidationError, BeaconChainError);
easy_from_to!(SszTypesError, BeaconChainError);
easy_from_to!(OpPoolError, BeaconChainError);
easy_from_to!(NaiveAggregationError, BeaconChainError);
easy_from_to!(ObservedAttestationsError, BeaconChainError);
easy_from_to!(ObservedAttestersError, BeaconChainError);
easy_from_to!(ObservedBlockProducersError, BeaconChainError);
easy_from_to!(BlockSignatureVerifierError, BeaconChainError);
easy_from_to!(ArithError, BeaconChainError);
#[derive(Debug, PartialEq)]
pub enum BlockProductionError {

View File

@@ -2,6 +2,7 @@
#[macro_use]
extern crate lazy_static;
pub mod attestation_verification;
mod beacon_chain;
mod beacon_snapshot;
mod block_verification;
@@ -14,6 +15,9 @@ mod head_tracker;
mod metrics;
pub mod migrate;
mod naive_aggregation_pool;
mod observed_attestations;
mod observed_attesters;
mod observed_block_producers;
mod persisted_beacon_chain;
mod shuffling_cache;
mod snapshot_cache;
@@ -22,11 +26,12 @@ mod timeout_rw_lock;
mod validator_pubkey_cache;
pub use self::beacon_chain::{
AttestationProcessingOutcome, AttestationType, BeaconChain, BeaconChainTypes,
ChainSegmentResult, StateSkipConfig,
AttestationProcessingOutcome, BeaconChain, BeaconChainTypes, ChainSegmentResult,
StateSkipConfig,
};
pub use self::beacon_snapshot::BeaconSnapshot;
pub use self::errors::{BeaconChainError, BlockProductionError};
pub use attestation_verification::Error as AttestationError;
pub use block_verification::{BlockError, BlockProcessingOutcome, GossipVerifiedBlock};
pub use eth1_chain::{Eth1Chain, Eth1ChainBackend};
pub use events::EventHandler;

View File

@@ -52,6 +52,10 @@ lazy_static! {
"beacon_block_processing_fork_choice_register_seconds",
"Time spent registering the new block with fork choice (but not finding head)"
);
pub static ref BLOCK_PROCESSING_ATTESTATION_OBSERVATION: Result<Histogram> = try_create_histogram(
"beacon_block_processing_attestation_observation_seconds",
"Time spent hashing and remembering all the attestations in the block"
);
/*
* Block Production

View File

@@ -46,8 +46,6 @@ pub enum Error {
/// stored. This indicates a fairly serious error somewhere in the code that called this
/// function.
InconsistentBitfieldLengths,
/// The function to obtain a map index failed, this is an internal error.
InvalidMapIndex(usize),
/// The given `attestation` was for the incorrect slot. This is an internal error.
IncorrectSlot { expected: Slot, attestation: Slot },
}
@@ -56,30 +54,20 @@ pub enum Error {
/// `attestation` are from the same slot.
struct AggregatedAttestationMap<E: EthSpec> {
map: HashMap<AttestationData, Attestation<E>>,
slot: Slot,
}
impl<E: EthSpec> AggregatedAttestationMap<E> {
/// Create an empty collection that will only contain attestation for the given `slot`.
pub fn new(slot: Slot) -> Self {
/// Create an empty collection with the given `initial_capacity`.
pub fn new(initial_capacity: usize) -> Self {
Self {
slot,
map: <_>::default(),
map: HashMap::with_capacity(initial_capacity),
}
}
/// Insert an attestation into `self`, aggregating it into the pool.
///
/// The given attestation (`a`) must only have one signature and be from the slot that `self`
/// was initialized with.
/// The given attestation (`a`) must only have one signature.
pub fn insert(&mut self, a: &Attestation<E>) -> Result<InsertOutcome, Error> {
if a.data.slot != self.slot {
return Err(Error::IncorrectSlot {
expected: self.slot,
attestation: a.data.slot,
});
}
let set_bits = a
.aggregation_bits
.iter()
@@ -124,15 +112,12 @@ impl<E: EthSpec> AggregatedAttestationMap<E> {
///
/// The given `a.data.slot` must match the slot that `self` was initialized with.
pub fn get(&self, data: &AttestationData) -> Result<Option<Attestation<E>>, Error> {
if data.slot != self.slot {
return Err(Error::IncorrectSlot {
expected: self.slot,
attestation: data.slot,
});
}
Ok(self.map.get(data).cloned())
}
pub fn len(&self) -> usize {
self.map.len()
}
}
/// A pool of `Attestation` that is specially designed to store "unaggregated" attestations from
@@ -158,14 +143,14 @@ impl<E: EthSpec> AggregatedAttestationMap<E> {
/// receives and it can be triggered manually.
pub struct NaiveAggregationPool<E: EthSpec> {
lowest_permissible_slot: RwLock<Slot>,
maps: RwLock<Vec<AggregatedAttestationMap<E>>>,
maps: RwLock<HashMap<Slot, AggregatedAttestationMap<E>>>,
}
impl<E: EthSpec> Default for NaiveAggregationPool<E> {
fn default() -> Self {
Self {
lowest_permissible_slot: RwLock::new(Slot::new(0)),
maps: RwLock::new(vec![]),
maps: RwLock::new(HashMap::new()),
}
}
}
@@ -179,28 +164,46 @@ impl<E: EthSpec> NaiveAggregationPool<E> {
/// The pool may be pruned if the given `attestation.data` has a slot higher than any
/// previously seen.
pub fn insert(&self, attestation: &Attestation<E>) -> Result<InsertOutcome, Error> {
let slot = attestation.data.slot;
let lowest_permissible_slot = *self.lowest_permissible_slot.read();
// Reject any attestations that are too old.
if attestation.data.slot < lowest_permissible_slot {
if slot < lowest_permissible_slot {
return Err(Error::SlotTooLow {
slot: attestation.data.slot,
slot,
lowest_permissible_slot,
});
}
// Prune the pool if this attestation indicates that the current slot has advanced.
if (lowest_permissible_slot + SLOTS_RETAINED as u64) < attestation.data.slot + 1 {
self.prune(attestation.data.slot)
}
let mut maps = self.maps.write();
let index = self.get_map_index(attestation.data.slot);
let outcome = if let Some(map) = maps.get_mut(&slot) {
map.insert(attestation)
} else {
// To avoid re-allocations, try and determine a rough initial capacity for the new item
// by obtaining the mean size of all items in earlier epoch.
let (count, sum) = maps
.iter()
// Only include epochs that are less than the given slot in the average. This should
// generally avoid including recent epochs that are still "filling up".
.filter(|(map_slot, _item)| **map_slot < slot)
.map(|(_slot, map)| map.len())
.fold((0, 0), |(count, sum), len| (count + 1, sum + len));
self.maps
.write()
.get_mut(index)
.ok_or_else(|| Error::InvalidMapIndex(index))?
.insert(attestation)
// Use the mainnet default committee size if we can't determine an average.
let initial_capacity = sum.checked_div(count).unwrap_or(128);
let mut item = AggregatedAttestationMap::new(initial_capacity);
let outcome = item.insert(attestation);
maps.insert(slot, item);
outcome
};
drop(maps);
self.prune(slot);
outcome
}
/// Returns an aggregated `Attestation` with the given `data`, if any.
@@ -208,8 +211,8 @@ impl<E: EthSpec> NaiveAggregationPool<E> {
self.maps
.read()
.iter()
.find(|map| map.slot == data.slot)
.map(|map| map.get(data))
.find(|(slot, _map)| **slot == data.slot)
.map(|(_slot, map)| map.get(data))
.unwrap_or_else(|| Ok(None))
}
@@ -218,41 +221,26 @@ impl<E: EthSpec> NaiveAggregationPool<E> {
pub fn prune(&self, current_slot: Slot) {
// Taking advantage of saturating subtraction on `Slot`.
let lowest_permissible_slot = current_slot - Slot::from(SLOTS_RETAINED);
self.maps
.write()
.retain(|map| map.slot >= lowest_permissible_slot);
*self.lowest_permissible_slot.write() = lowest_permissible_slot;
}
/// Returns the index of `self.maps` that matches `slot`.
///
/// If there is no existing map for this slot one will be created. If `self.maps.len() >=
/// SLOTS_RETAINED`, the map with the lowest slot will be replaced.
fn get_map_index(&self, slot: Slot) -> usize {
let mut maps = self.maps.write();
if let Some(index) = maps.iter().position(|map| map.slot == slot) {
return index;
// Remove any maps that are definitely expired.
maps.retain(|slot, _map| *slot >= lowest_permissible_slot);
// If we have too many maps, remove the lowest amount to ensure we only have
// `SLOTS_RETAINED` left.
if maps.len() > SLOTS_RETAINED {
let mut slots = maps.iter().map(|(slot, _map)| *slot).collect::<Vec<_>>();
// Sort is generally pretty slow, however `SLOTS_RETAINED` is quite low so it should be
// negligible.
slots.sort_unstable();
slots
.into_iter()
.take(maps.len().saturating_sub(SLOTS_RETAINED))
.for_each(|slot| {
maps.remove(&slot);
})
}
if maps.len() < SLOTS_RETAINED || maps.is_empty() {
let index = maps.len();
maps.push(AggregatedAttestationMap::new(slot));
return index;
}
let index = maps
.iter()
.enumerate()
.min_by_key(|(_i, map)| map.slot)
.map(|(i, _map)| i)
.expect("maps cannot be empty due to previous .is_empty() check");
maps[index] = AggregatedAttestationMap::new(slot);
index
}
}
@@ -432,7 +420,7 @@ mod tests {
.maps
.read()
.iter()
.map(|map| map.slot)
.map(|(slot, _map)| *slot)
.collect::<Vec<_>>();
pool_slots.sort_unstable();

View File

@@ -0,0 +1,442 @@
//! Provides an `ObservedAttestations` struct which allows us to reject aggregated attestations if
//! we've already seen the aggregated attestation.
use parking_lot::RwLock;
use std::collections::HashSet;
use std::marker::PhantomData;
use tree_hash::TreeHash;
use types::{Attestation, EthSpec, Hash256, Slot};
/// As a DoS protection measure, the maximum number of distinct `Attestations` that will be
/// recorded for each slot.
///
/// Currently this is set to ~524k. If we say that each entry is 40 bytes (Hash256 (32 bytes) + an
/// 8 byte hash) then this comes to about 20mb per slot. If we're storing 34 of these slots, then
/// we're at 680mb. This is a lot of memory usage, but probably not a show-stopper for most
/// reasonable hardware.
///
/// Upstream conditions should strongly restrict the amount of attestations that can show up in
/// this pool. The maximum size with respect to upstream restrictions is more likely on the order
/// of the number of validators.
const MAX_OBSERVATIONS_PER_SLOT: usize = 1 << 19; // 524,288
#[derive(Debug, PartialEq)]
pub enum ObserveOutcome {
/// This attestation was already known.
AlreadyKnown,
/// This was the first time this attestation was observed.
New,
}
#[derive(Debug, PartialEq)]
pub enum Error {
SlotTooLow {
slot: Slot,
lowest_permissible_slot: Slot,
},
/// The function to obtain a set index failed, this is an internal error.
InvalidSetIndex(usize),
/// We have reached the maximum number of unique `Attestation` that can be observed in a slot.
/// This is a DoS protection function.
ReachedMaxObservationsPerSlot(usize),
IncorrectSlot {
expected: Slot,
attestation: Slot,
},
}
/// A `HashSet` that contains entries related to some `Slot`.
struct SlotHashSet {
set: HashSet<Hash256>,
slot: Slot,
}
impl SlotHashSet {
pub fn new(slot: Slot, initial_capacity: usize) -> Self {
Self {
slot,
set: HashSet::with_capacity(initial_capacity),
}
}
/// Store the attestation in self so future observations recognise its existence.
pub fn observe_attestation<E: EthSpec>(
&mut self,
a: &Attestation<E>,
root: Hash256,
) -> Result<ObserveOutcome, Error> {
if a.data.slot != self.slot {
return Err(Error::IncorrectSlot {
expected: self.slot,
attestation: a.data.slot,
});
}
if self.set.contains(&root) {
Ok(ObserveOutcome::AlreadyKnown)
} else {
// Here we check to see if this slot has reached the maximum observation count.
//
// The resulting behaviour is that we are no longer able to successfully observe new
// attestations, however we will continue to return `is_known` values. We could also
// disable `is_known`, however then we would stop forwarding attestations across the
// gossip network and I think that this is a worse case than sending some invalid ones.
// The underlying libp2p network is responsible for removing duplicate messages, so
// this doesn't risk a broadcast loop.
if self.set.len() >= MAX_OBSERVATIONS_PER_SLOT {
return Err(Error::ReachedMaxObservationsPerSlot(
MAX_OBSERVATIONS_PER_SLOT,
));
}
self.set.insert(root);
Ok(ObserveOutcome::New)
}
}
/// Indicates if `a` has been observed before.
pub fn is_known<E: EthSpec>(&self, a: &Attestation<E>, root: Hash256) -> Result<bool, Error> {
if a.data.slot != self.slot {
return Err(Error::IncorrectSlot {
expected: self.slot,
attestation: a.data.slot,
});
}
Ok(self.set.contains(&root))
}
/// The number of observed attestations in `self`.
pub fn len(&self) -> usize {
self.set.len()
}
}
/// Stores the roots of `Attestation` objects for some number of `Slots`, so we can determine if
/// these have previously been seen on the network.
pub struct ObservedAttestations<E: EthSpec> {
lowest_permissible_slot: RwLock<Slot>,
sets: RwLock<Vec<SlotHashSet>>,
_phantom: PhantomData<E>,
}
impl<E: EthSpec> Default for ObservedAttestations<E> {
fn default() -> Self {
Self {
lowest_permissible_slot: RwLock::new(Slot::new(0)),
sets: RwLock::new(vec![]),
_phantom: PhantomData,
}
}
}
impl<E: EthSpec> ObservedAttestations<E> {
/// Store the root of `a` in `self`.
///
/// `root` must equal `a.tree_hash_root()`.
pub fn observe_attestation(
&self,
a: &Attestation<E>,
root_opt: Option<Hash256>,
) -> Result<ObserveOutcome, Error> {
let index = self.get_set_index(a.data.slot)?;
let root = root_opt.unwrap_or_else(|| a.tree_hash_root());
self.sets
.write()
.get_mut(index)
.ok_or_else(|| Error::InvalidSetIndex(index))
.and_then(|set| set.observe_attestation(a, root))
}
/// Check to see if the `root` of `a` is in self.
///
/// `root` must equal `a.tree_hash_root()`.
pub fn is_known(&self, a: &Attestation<E>, root: Hash256) -> Result<bool, Error> {
let index = self.get_set_index(a.data.slot)?;
self.sets
.read()
.get(index)
.ok_or_else(|| Error::InvalidSetIndex(index))
.and_then(|set| set.is_known(a, root))
}
/// The maximum number of slots that attestations are stored for.
fn max_capacity(&self) -> u64 {
// We add `2` in order to account for one slot either side of the range due to
// `MAXIMUM_GOSSIP_CLOCK_DISPARITY`.
E::slots_per_epoch() + 2
}
/// Removes any attestations with a slot lower than `current_slot` and bars any future
/// attestations with a slot lower than `current_slot - SLOTS_RETAINED`.
pub fn prune(&self, current_slot: Slot) {
// Taking advantage of saturating subtraction on `Slot`.
let lowest_permissible_slot = current_slot - (self.max_capacity() - 1);
self.sets
.write()
.retain(|set| set.slot >= lowest_permissible_slot);
*self.lowest_permissible_slot.write() = lowest_permissible_slot;
}
/// Returns the index of `self.set` that matches `slot`.
///
/// If there is no existing set for this slot one will be created. If `self.sets.len() >=
/// Self::max_capacity()`, the set with the lowest slot will be replaced.
fn get_set_index(&self, slot: Slot) -> Result<usize, Error> {
let lowest_permissible_slot: Slot = *self.lowest_permissible_slot.read();
if slot < lowest_permissible_slot {
return Err(Error::SlotTooLow {
slot,
lowest_permissible_slot,
});
}
// Prune the pool if this attestation indicates that the current slot has advanced.
if lowest_permissible_slot + self.max_capacity() < slot + 1 {
self.prune(slot)
}
let mut sets = self.sets.write();
if let Some(index) = sets.iter().position(|set| set.slot == slot) {
return Ok(index);
}
// To avoid re-allocations, try and determine a rough initial capacity for the new set
// by obtaining the mean size of all items in earlier epoch.
let (count, sum) = sets
.iter()
// Only include slots that are less than the given slot in the average. This should
// generally avoid including recent slots that are still "filling up".
.filter(|set| set.slot < slot)
.map(|set| set.len())
.fold((0, 0), |(count, sum), len| (count + 1, sum + len));
// If we are unable to determine an average, just use 128 as it's the target committee
// size for the mainnet spec. This is perhaps a little wasteful for the minimal spec,
// but considering it's approx. 128 * 32 bytes we're not wasting much.
let initial_capacity = sum.checked_div(count).unwrap_or(128);
if sets.len() < self.max_capacity() as usize || sets.is_empty() {
let index = sets.len();
sets.push(SlotHashSet::new(slot, initial_capacity));
return Ok(index);
}
let index = sets
.iter()
.enumerate()
.min_by_key(|(_i, set)| set.slot)
.map(|(i, _set)| i)
.expect("sets cannot be empty due to previous .is_empty() check");
sets[index] = SlotHashSet::new(slot, initial_capacity);
Ok(index)
}
}
#[cfg(test)]
mod tests {
use super::*;
use tree_hash::TreeHash;
use types::{test_utils::test_random_instance, Hash256};
type E = types::MainnetEthSpec;
const NUM_ELEMENTS: usize = 8;
fn get_attestation(slot: Slot, beacon_block_root: u64) -> Attestation<E> {
let mut a: Attestation<E> = test_random_instance();
a.data.slot = slot;
a.data.beacon_block_root = Hash256::from_low_u64_be(beacon_block_root);
a
}
fn single_slot_test(store: &ObservedAttestations<E>, slot: Slot) {
let attestations = (0..NUM_ELEMENTS as u64)
.map(|i| get_attestation(slot, i))
.collect::<Vec<_>>();
for a in &attestations {
assert_eq!(
store.is_known(a, a.tree_hash_root()),
Ok(false),
"should indicate an unknown attestation is unknown"
);
assert_eq!(
store.observe_attestation(a, None),
Ok(ObserveOutcome::New),
"should observe new attestation"
);
}
for a in &attestations {
assert_eq!(
store.is_known(a, a.tree_hash_root()),
Ok(true),
"should indicate a known attestation is known"
);
assert_eq!(
store.observe_attestation(a, Some(a.tree_hash_root())),
Ok(ObserveOutcome::AlreadyKnown),
"should acknowledge an existing attestation"
);
}
}
#[test]
fn single_slot() {
let store = ObservedAttestations::default();
single_slot_test(&store, Slot::new(0));
assert_eq!(
store.sets.read().len(),
1,
"should have a single set stored"
);
assert_eq!(
store.sets.read()[0].len(),
NUM_ELEMENTS,
"set should have NUM_ELEMENTS elements"
);
}
#[test]
fn mulitple_contiguous_slots() {
let store = ObservedAttestations::default();
let max_cap = store.max_capacity();
for i in 0..max_cap * 3 {
let slot = Slot::new(i);
single_slot_test(&store, slot);
/*
* Ensure that the number of sets is correct.
*/
if i < max_cap {
assert_eq!(
store.sets.read().len(),
i as usize + 1,
"should have a {} sets stored",
i + 1
);
} else {
assert_eq!(
store.sets.read().len(),
max_cap as usize,
"should have max_capacity sets stored"
);
}
/*
* Ensure that each set contains the correct number of elements.
*/
for set in &store.sets.read()[..] {
assert_eq!(
set.len(),
NUM_ELEMENTS,
"each store should have NUM_ELEMENTS elements"
)
}
/*
* Ensure that all the sets have the expected slots
*/
let mut store_slots = store
.sets
.read()
.iter()
.map(|set| set.slot)
.collect::<Vec<_>>();
assert!(
store_slots.len() <= store.max_capacity() as usize,
"store size should not exceed max"
);
store_slots.sort_unstable();
let expected_slots = (i.saturating_sub(max_cap - 1)..=i)
.map(Slot::new)
.collect::<Vec<_>>();
assert_eq!(expected_slots, store_slots, "should have expected slots");
}
}
#[test]
fn mulitple_non_contiguous_slots() {
let store = ObservedAttestations::default();
let max_cap = store.max_capacity();
let to_skip = vec![1_u64, 2, 3, 5, 6, 29, 30, 31, 32, 64];
let slots = (0..max_cap * 3)
.into_iter()
.filter(|i| !to_skip.contains(i))
.collect::<Vec<_>>();
for &i in &slots {
if to_skip.contains(&i) {
continue;
}
let slot = Slot::from(i);
single_slot_test(&store, slot);
/*
* Ensure that each set contains the correct number of elements.
*/
for set in &store.sets.read()[..] {
assert_eq!(
set.len(),
NUM_ELEMENTS,
"each store should have NUM_ELEMENTS elements"
)
}
/*
* Ensure that all the sets have the expected slots
*/
let mut store_slots = store
.sets
.read()
.iter()
.map(|set| set.slot)
.collect::<Vec<_>>();
store_slots.sort_unstable();
assert!(
store_slots.len() <= store.max_capacity() as usize,
"store size should not exceed max"
);
let lowest = store.lowest_permissible_slot.read().as_u64();
let highest = slot.as_u64();
let expected_slots = (lowest..=highest)
.filter(|i| !to_skip.contains(i))
.map(Slot::new)
.collect::<Vec<_>>();
assert_eq!(
expected_slots,
&store_slots[..],
"should have expected slots"
);
}
}
}

View File

@@ -0,0 +1,441 @@
//! Provides two structs that help us filter out attestation gossip from validators that have
//! already published attestations:
//!
//! - `ObservedAttesters`: allows filtering unaggregated attestations from the same validator in
//! the same epoch.
//! - `ObservedAggregators`: allows filtering aggregated attestations from the same aggregators in
//! the same epoch
use bitvec::vec::BitVec;
use parking_lot::RwLock;
use std::collections::{HashMap, HashSet};
use std::marker::PhantomData;
use types::{Attestation, Epoch, EthSpec, Unsigned};
pub type ObservedAttesters<E> = AutoPruningContainer<EpochBitfield, E>;
pub type ObservedAggregators<E> = AutoPruningContainer<EpochHashSet, E>;
#[derive(Debug, PartialEq)]
pub enum Error {
EpochTooLow {
epoch: Epoch,
lowest_permissible_epoch: Epoch,
},
/// We have reached the maximum number of unique `Attestation` that can be observed in a slot.
/// This is a DoS protection function.
ReachedMaxObservationsPerSlot(usize),
/// The function to obtain a set index failed, this is an internal error.
ValidatorIndexTooHigh(usize),
}
/// Implemented on an item in an `AutoPruningContainer`.
pub trait Item {
/// Instantiate `Self` with the given `capacity`.
fn with_capacity(capacity: usize) -> Self;
/// The default capacity for self. Used when we can't guess a reasonable size.
fn default_capacity() -> usize;
/// Returns the number of validator indices stored in `self`.
fn len(&self) -> usize;
/// Store `validator_index` in `self`.
fn insert(&mut self, validator_index: usize) -> bool;
/// Returns `true` if `validator_index` has been stored in `self`.
fn contains(&self, validator_index: usize) -> bool;
}
/// Stores a `BitVec` that represents which validator indices have attested during an epoch.
pub struct EpochBitfield {
bitfield: BitVec,
}
impl Item for EpochBitfield {
fn with_capacity(capacity: usize) -> Self {
Self {
bitfield: BitVec::with_capacity(capacity),
}
}
/// Uses a default size that equals the number of genesis validators.
fn default_capacity() -> usize {
16_384
}
fn len(&self) -> usize {
self.bitfield.len()
}
fn insert(&mut self, validator_index: usize) -> bool {
self.bitfield
.get_mut(validator_index)
.map(|mut bit| {
if *bit {
true
} else {
*bit = true;
false
}
})
.unwrap_or_else(|| {
self.bitfield
.resize(validator_index.saturating_add(1), false);
self.bitfield
.get_mut(validator_index)
.map(|mut bit| *bit = true);
false
})
}
fn contains(&self, validator_index: usize) -> bool {
self.bitfield.get(validator_index).map_or(false, |bit| *bit)
}
}
/// Stores a `HashSet` of which validator indices have created an aggregate attestation during an
/// epoch.
pub struct EpochHashSet {
set: HashSet<usize>,
}
impl Item for EpochHashSet {
fn with_capacity(capacity: usize) -> Self {
Self {
set: HashSet::with_capacity(capacity),
}
}
/// Defaults to the target number of aggregators per committee (16) multiplied by the expected
/// max committee count (64).
fn default_capacity() -> usize {
16 * 64
}
fn len(&self) -> usize {
self.set.len()
}
/// Inserts the `validator_index` in the set. Returns `true` if the `validator_index` was
/// already in the set.
fn insert(&mut self, validator_index: usize) -> bool {
!self.set.insert(validator_index)
}
/// Returns `true` if the `validator_index` is in the set.
fn contains(&self, validator_index: usize) -> bool {
self.set.contains(&validator_index)
}
}
/// A container that stores some number of `T` items.
///
/// This container is "auto-pruning" since it gets an idea of the current slot by which
/// attestations are provided to it and prunes old entries based upon that. For example, if
/// `Self::max_capacity == 32` and an attestation with `a.data.target.epoch` is supplied, then all
/// attestations with an epoch prior to `a.data.target.epoch - 32` will be cleared from the cache.
///
/// `T` should be set to a `EpochBitfield` or `EpochHashSet`.
pub struct AutoPruningContainer<T, E: EthSpec> {
lowest_permissible_epoch: RwLock<Epoch>,
items: RwLock<HashMap<Epoch, T>>,
_phantom: PhantomData<E>,
}
impl<T, E: EthSpec> Default for AutoPruningContainer<T, E> {
fn default() -> Self {
Self {
lowest_permissible_epoch: RwLock::new(Epoch::new(0)),
items: RwLock::new(HashMap::new()),
_phantom: PhantomData,
}
}
}
impl<T: Item, E: EthSpec> AutoPruningContainer<T, E> {
/// Observe that `validator_index` has produced attestation `a`. Returns `Ok(true)` if `a` has
/// previously been observed for `validator_index`.
///
/// ## Errors
///
/// - `validator_index` is higher than `VALIDATOR_REGISTRY_LIMIT`.
/// - `a.data.target.slot` is earlier than `self.earliest_permissible_slot`.
pub fn observe_validator(
&self,
a: &Attestation<E>,
validator_index: usize,
) -> Result<bool, Error> {
self.sanitize_request(a, validator_index)?;
let epoch = a.data.target.epoch;
self.prune(epoch);
let mut items = self.items.write();
if let Some(item) = items.get_mut(&epoch) {
Ok(item.insert(validator_index))
} else {
// To avoid re-allocations, try and determine a rough initial capacity for the new item
// by obtaining the mean size of all items in earlier epoch.
let (count, sum) = items
.iter()
// Only include epochs that are less than the given slot in the average. This should
// generally avoid including recent epochs that are still "filling up".
.filter(|(item_epoch, _item)| **item_epoch < epoch)
.map(|(_epoch, item)| item.len())
.fold((0, 0), |(count, sum), len| (count + 1, sum + len));
let initial_capacity = sum.checked_div(count).unwrap_or(T::default_capacity());
let mut item = T::with_capacity(initial_capacity);
item.insert(validator_index);
items.insert(epoch, item);
Ok(false)
}
}
/// Returns `Ok(true)` if the `validator_index` has produced an attestation conflicting with
/// `a`.
///
/// ## Errors
///
/// - `validator_index` is higher than `VALIDATOR_REGISTRY_LIMIT`.
/// - `a.data.target.slot` is earlier than `self.earliest_permissible_slot`.
pub fn validator_has_been_observed(
&self,
a: &Attestation<E>,
validator_index: usize,
) -> Result<bool, Error> {
self.sanitize_request(a, validator_index)?;
let exists = self
.items
.read()
.get(&a.data.target.epoch)
.map_or(false, |item| item.contains(validator_index));
Ok(exists)
}
fn sanitize_request(&self, a: &Attestation<E>, validator_index: usize) -> Result<(), Error> {
if validator_index > E::ValidatorRegistryLimit::to_usize() {
return Err(Error::ValidatorIndexTooHigh(validator_index));
}
let epoch = a.data.target.epoch;
let lowest_permissible_epoch: Epoch = *self.lowest_permissible_epoch.read();
if epoch < lowest_permissible_epoch {
return Err(Error::EpochTooLow {
epoch,
lowest_permissible_epoch,
});
}
Ok(())
}
/// The maximum number of epochs stored in `self`.
fn max_capacity(&self) -> u64 {
// The current epoch and the previous epoch. This is sufficient whilst
// GOSSIP_CLOCK_DISPARITY is 1/2 a slot or less:
//
// https://github.com/ethereum/eth2.0-specs/pull/1706#issuecomment-610151808
2
}
/// Updates `self` with the current epoch, removing all attestations that become expired
/// relative to `Self::max_capacity`.
///
/// Also sets `self.lowest_permissible_epoch` with relation to `current_epoch` and
/// `Self::max_capacity`.
pub fn prune(&self, current_epoch: Epoch) {
// Taking advantage of saturating subtraction on `Slot`.
let lowest_permissible_epoch = current_epoch - (self.max_capacity().saturating_sub(1));
*self.lowest_permissible_epoch.write() = lowest_permissible_epoch;
self.items
.write()
.retain(|epoch, _item| *epoch >= lowest_permissible_epoch);
}
}
#[cfg(test)]
mod tests {
use super::*;
macro_rules! test_suite {
($mod_name: ident, $type: ident) => {
#[cfg(test)]
mod $mod_name {
use super::*;
use types::test_utils::test_random_instance;
type E = types::MainnetEthSpec;
fn get_attestation(epoch: Epoch) -> Attestation<E> {
let mut a: Attestation<E> = test_random_instance();
a.data.target.epoch = epoch;
a
}
fn single_epoch_test(store: &$type<E>, epoch: Epoch) {
let attesters = [0, 1, 2, 3, 5, 6, 7, 18, 22];
let a = &get_attestation(epoch);
for &i in &attesters {
assert_eq!(
store.validator_has_been_observed(a, i),
Ok(false),
"should indicate an unknown attestation is unknown"
);
assert_eq!(
store.observe_validator(a, i),
Ok(false),
"should observe new attestation"
);
}
for &i in &attesters {
assert_eq!(
store.validator_has_been_observed(a, i),
Ok(true),
"should indicate a known attestation is known"
);
assert_eq!(
store.observe_validator(a, i),
Ok(true),
"should acknowledge an existing attestation"
);
}
}
#[test]
fn single_epoch() {
let store = $type::default();
single_epoch_test(&store, Epoch::new(0));
assert_eq!(
store.items.read().len(),
1,
"should have a single bitfield stored"
);
}
#[test]
fn mulitple_contiguous_epochs() {
let store = $type::default();
let max_cap = store.max_capacity();
for i in 0..max_cap * 3 {
let epoch = Epoch::new(i);
single_epoch_test(&store, epoch);
/*
* Ensure that the number of sets is correct.
*/
if i < max_cap {
assert_eq!(
store.items.read().len(),
i as usize + 1,
"should have a {} items stored",
i + 1
);
} else {
assert_eq!(
store.items.read().len(),
max_cap as usize,
"should have max_capacity items stored"
);
}
/*
* Ensure that all the sets have the expected slots
*/
let mut store_epochs = store
.items
.read()
.iter()
.map(|(epoch, _set)| *epoch)
.collect::<Vec<_>>();
assert!(
store_epochs.len() <= store.max_capacity() as usize,
"store size should not exceed max"
);
store_epochs.sort_unstable();
let expected_epochs = (i.saturating_sub(max_cap - 1)..=i)
.map(Epoch::new)
.collect::<Vec<_>>();
assert_eq!(expected_epochs, store_epochs, "should have expected slots");
}
}
#[test]
fn mulitple_non_contiguous_epochs() {
let store = $type::default();
let max_cap = store.max_capacity();
let to_skip = vec![1_u64, 3, 4, 5];
let epochs = (0..max_cap * 3)
.into_iter()
.filter(|i| !to_skip.contains(i))
.collect::<Vec<_>>();
for &i in &epochs {
if to_skip.contains(&i) {
continue;
}
let epoch = Epoch::from(i);
single_epoch_test(&store, epoch);
/*
* Ensure that all the sets have the expected slots
*/
let mut store_epochs = store
.items
.read()
.iter()
.map(|(epoch, _)| *epoch)
.collect::<Vec<_>>();
store_epochs.sort_unstable();
assert!(
store_epochs.len() <= store.max_capacity() as usize,
"store size should not exceed max"
);
let lowest = store.lowest_permissible_epoch.read().as_u64();
let highest = epoch.as_u64();
let expected_epochs = (lowest..=highest)
.filter(|i| !to_skip.contains(i))
.map(Epoch::new)
.collect::<Vec<_>>();
assert_eq!(
expected_epochs,
&store_epochs[..],
"should have expected epochs"
);
}
}
}
};
}
test_suite!(observed_attesters, ObservedAttesters);
test_suite!(observed_aggregators, ObservedAggregators);
}

View File

@@ -0,0 +1,428 @@
//! Provides the `ObservedBlockProducers` struct which allows for rejecting gossip blocks from
//! validators that have already produced a block.
use parking_lot::RwLock;
use std::collections::{HashMap, HashSet};
use std::marker::PhantomData;
use types::{BeaconBlock, EthSpec, Slot, Unsigned};
#[derive(Debug, PartialEq)]
pub enum Error {
/// The slot of the provided block is prior to finalization and should not have been provided
/// to this function. This is an internal error.
FinalizedBlock { slot: Slot, finalized_slot: Slot },
/// The function to obtain a set index failed, this is an internal error.
ValidatorIndexTooHigh(u64),
}
/// Maintains a cache of observed `(block.slot, block.proposer)`.
///
/// The cache supports pruning based upon the finalized epoch. It does not automatically prune, you
/// must call `Self::prune` manually.
///
/// The maximum size of the cache is determined by `slots_since_finality *
/// VALIDATOR_REGISTRY_LIMIT`. This is quite a large size, so it's important that upstream
/// functions only use this cache for blocks with a valid signature. Only allowing valid signed
/// blocks reduces the theoretical maximum size of this cache to `slots_since_finality *
/// active_validator_count`, however in reality that is more like `slots_since_finality *
/// known_distinct_shufflings` which is much smaller.
pub struct ObservedBlockProducers<E: EthSpec> {
finalized_slot: RwLock<Slot>,
items: RwLock<HashMap<Slot, HashSet<u64>>>,
_phantom: PhantomData<E>,
}
impl<E: EthSpec> Default for ObservedBlockProducers<E> {
/// Instantiates `Self` with `finalized_slot == 0`.
fn default() -> Self {
Self {
finalized_slot: RwLock::new(Slot::new(0)),
items: RwLock::new(HashMap::new()),
_phantom: PhantomData,
}
}
}
impl<E: EthSpec> ObservedBlockProducers<E> {
/// Observe that the `block` was produced by `block.proposer_index` at `block.slot`. This will
/// update `self` so future calls to it indicate that this block is known.
///
/// The supplied `block` **MUST** be signature verified (see struct-level documentation).
///
/// ## Errors
///
/// - `block.proposer_index` is greater than `VALIDATOR_REGISTRY_LIMIT`.
/// - `block.slot` is equal to or less than the latest pruned `finalized_slot`.
pub fn observe_proposer(&self, block: &BeaconBlock<E>) -> Result<bool, Error> {
self.sanitize_block(block)?;
let did_not_exist = self
.items
.write()
.entry(block.slot)
.or_insert_with(|| HashSet::with_capacity(E::SlotsPerEpoch::to_usize()))
.insert(block.proposer_index);
Ok(!did_not_exist)
}
/// Returns `Ok(true)` if the `block` has been observed before, `Ok(false)` if not. Does not
/// update the cache, so calling this function multiple times will continue to return
/// `Ok(false)`, until `Self::observe_proposer` is called.
///
/// ## Errors
///
/// - `block.proposer_index` is greater than `VALIDATOR_REGISTRY_LIMIT`.
/// - `block.slot` is equal to or less than the latest pruned `finalized_slot`.
pub fn proposer_has_been_observed(&self, block: &BeaconBlock<E>) -> Result<bool, Error> {
self.sanitize_block(block)?;
let exists = self
.items
.read()
.get(&block.slot)
.map_or(false, |set| set.contains(&block.proposer_index));
Ok(exists)
}
/// Returns `Ok(())` if the given `block` is sane.
fn sanitize_block(&self, block: &BeaconBlock<E>) -> Result<(), Error> {
if block.proposer_index > E::ValidatorRegistryLimit::to_u64() {
return Err(Error::ValidatorIndexTooHigh(block.proposer_index));
}
let finalized_slot = *self.finalized_slot.read();
if finalized_slot > 0 && block.slot <= finalized_slot {
return Err(Error::FinalizedBlock {
slot: block.slot,
finalized_slot,
});
}
Ok(())
}
/// Removes all observations of blocks equal to or earlier than `finalized_slot`.
///
/// Stores `finalized_slot` in `self`, so that `self` will reject any block that has a slot
/// equal to or less than `finalized_slot`.
///
/// No-op if `finalized_slot == 0`.
pub fn prune(&self, finalized_slot: Slot) {
if finalized_slot == 0 {
return;
}
*self.finalized_slot.write() = finalized_slot;
self.items
.write()
.retain(|slot, _set| *slot > finalized_slot);
}
}
#[cfg(test)]
mod tests {
use super::*;
use types::MainnetEthSpec;
type E = MainnetEthSpec;
fn get_block(slot: u64, proposer: u64) -> BeaconBlock<E> {
let mut block = BeaconBlock::empty(&E::default_spec());
block.slot = slot.into();
block.proposer_index = proposer;
block
}
#[test]
fn pruning() {
let cache = ObservedBlockProducers::default();
assert_eq!(*cache.finalized_slot.read(), 0, "finalized slot is zero");
assert_eq!(cache.items.read().len(), 0, "no slots should be present");
// Slot 0, proposer 0
let block_a = &get_block(0, 0);
assert_eq!(
cache.observe_proposer(block_a),
Ok(false),
"can observe proposer, indicates proposer unobserved"
);
/*
* Preconditions.
*/
assert_eq!(*cache.finalized_slot.read(), 0, "finalized slot is zero");
assert_eq!(
cache.items.read().len(),
1,
"only one slot should be present"
);
assert_eq!(
cache
.items
.read()
.get(&Slot::new(0))
.expect("slot zero should be present")
.len(),
1,
"only one proposer should be present"
);
/*
* Check that a prune at the genesis slot does nothing.
*/
cache.prune(Slot::new(0));
assert_eq!(*cache.finalized_slot.read(), 0, "finalized slot is zero");
assert_eq!(
cache.items.read().len(),
1,
"only one slot should be present"
);
assert_eq!(
cache
.items
.read()
.get(&Slot::new(0))
.expect("slot zero should be present")
.len(),
1,
"only one proposer should be present"
);
/*
* Check that a prune empties the cache
*/
cache.prune(E::slots_per_epoch().into());
assert_eq!(
*cache.finalized_slot.read(),
Slot::from(E::slots_per_epoch()),
"finalized slot is updated"
);
assert_eq!(cache.items.read().len(), 0, "no items left");
/*
* Check that we can't insert a finalized block
*/
// First slot of finalized epoch, proposer 0
let block_b = &get_block(E::slots_per_epoch(), 0);
assert_eq!(
cache.observe_proposer(block_b),
Err(Error::FinalizedBlock {
slot: E::slots_per_epoch().into(),
finalized_slot: E::slots_per_epoch().into(),
}),
"cant insert finalized block"
);
assert_eq!(cache.items.read().len(), 0, "block was not added");
/*
* Check that we _can_ insert a non-finalized block
*/
let three_epochs = E::slots_per_epoch() * 3;
// First slot of finalized epoch, proposer 0
let block_b = &get_block(three_epochs, 0);
assert_eq!(
cache.observe_proposer(block_b),
Ok(false),
"can insert non-finalized block"
);
assert_eq!(
cache.items.read().len(),
1,
"only one slot should be present"
);
assert_eq!(
cache
.items
.read()
.get(&Slot::new(three_epochs))
.expect("the three epochs slot should be present")
.len(),
1,
"only one proposer should be present"
);
/*
* Check that a prune doesnt wipe later blocks
*/
let two_epochs = E::slots_per_epoch() * 2;
cache.prune(two_epochs.into());
assert_eq!(
*cache.finalized_slot.read(),
Slot::from(two_epochs),
"finalized slot is updated"
);
assert_eq!(
cache.items.read().len(),
1,
"only one slot should be present"
);
assert_eq!(
cache
.items
.read()
.get(&Slot::new(three_epochs))
.expect("the three epochs slot should be present")
.len(),
1,
"only one proposer should be present"
);
}
#[test]
fn simple_observations() {
let cache = ObservedBlockProducers::default();
// Slot 0, proposer 0
let block_a = &get_block(0, 0);
assert_eq!(
cache.proposer_has_been_observed(block_a),
Ok(false),
"no observation in empty cache"
);
assert_eq!(
cache.observe_proposer(block_a),
Ok(false),
"can observe proposer, indicates proposer unobserved"
);
assert_eq!(
cache.proposer_has_been_observed(block_a),
Ok(true),
"observed block is indicated as true"
);
assert_eq!(
cache.observe_proposer(block_a),
Ok(true),
"observing again indicates true"
);
assert_eq!(*cache.finalized_slot.read(), 0, "finalized slot is zero");
assert_eq!(
cache.items.read().len(),
1,
"only one slot should be present"
);
assert_eq!(
cache
.items
.read()
.get(&Slot::new(0))
.expect("slot zero should be present")
.len(),
1,
"only one proposer should be present"
);
// Slot 1, proposer 0
let block_b = &get_block(1, 0);
assert_eq!(
cache.proposer_has_been_observed(block_b),
Ok(false),
"no observation for new slot"
);
assert_eq!(
cache.observe_proposer(block_b),
Ok(false),
"can observe proposer for new slot, indicates proposer unobserved"
);
assert_eq!(
cache.proposer_has_been_observed(block_b),
Ok(true),
"observed block in slot 1 is indicated as true"
);
assert_eq!(
cache.observe_proposer(block_b),
Ok(true),
"observing slot 1 again indicates true"
);
assert_eq!(*cache.finalized_slot.read(), 0, "finalized slot is zero");
assert_eq!(cache.items.read().len(), 2, "two slots should be present");
assert_eq!(
cache
.items
.read()
.get(&Slot::new(0))
.expect("slot zero should be present")
.len(),
1,
"only one proposer should be present in slot 0"
);
assert_eq!(
cache
.items
.read()
.get(&Slot::new(1))
.expect("slot zero should be present")
.len(),
1,
"only one proposer should be present in slot 1"
);
// Slot 0, proposer 1
let block_c = &get_block(0, 1);
assert_eq!(
cache.proposer_has_been_observed(block_c),
Ok(false),
"no observation for new proposer"
);
assert_eq!(
cache.observe_proposer(block_c),
Ok(false),
"can observe new proposer, indicates proposer unobserved"
);
assert_eq!(
cache.proposer_has_been_observed(block_c),
Ok(true),
"observed new proposer block is indicated as true"
);
assert_eq!(
cache.observe_proposer(block_c),
Ok(true),
"observing new proposer again indicates true"
);
assert_eq!(*cache.finalized_slot.read(), 0, "finalized slot is zero");
assert_eq!(cache.items.read().len(), 2, "two slots should be present");
assert_eq!(
cache
.items
.read()
.get(&Slot::new(0))
.expect("slot zero should be present")
.len(),
2,
"two proposers should be present in slot 0"
);
assert_eq!(
cache
.items
.read()
.get(&Slot::new(1))
.expect("slot zero should be present")
.len(),
1,
"only one proposer should be present in slot 1"
);
}
}

View File

@@ -7,7 +7,7 @@ use crate::{
builder::{BeaconChainBuilder, Witness},
eth1_chain::CachingEth1Backend,
events::NullEventHandler,
AttestationProcessingOutcome, AttestationType, BeaconChain, BeaconChainTypes, StateSkipConfig,
BeaconChain, BeaconChainTypes, StateSkipConfig,
};
use genesis::interop_genesis_state;
use rayon::prelude::*;
@@ -23,8 +23,8 @@ use tempfile::{tempdir, TempDir};
use tree_hash::TreeHash;
use types::{
AggregateSignature, Attestation, BeaconState, BeaconStateHash, ChainSpec, Domain, EthSpec,
Hash256, Keypair, SecretKey, Signature, SignedBeaconBlock, SignedBeaconBlockHash, SignedRoot,
Slot,
Hash256, Keypair, SecretKey, SelectionProof, Signature, SignedAggregateAndProof,
SignedBeaconBlock, SignedBeaconBlockHash, SignedRoot, Slot,
};
pub use types::test_utils::generate_deterministic_keypairs;
@@ -85,8 +85,24 @@ pub struct BeaconChainHarness<T: BeaconChainTypes> {
impl<E: EthSpec> BeaconChainHarness<HarnessType<E>> {
/// Instantiate a new harness with `validator_count` initial validators.
pub fn new(eth_spec_instance: E, keypairs: Vec<Keypair>) -> Self {
// Setting the target aggregators to really high means that _all_ validators in the
// committee are required to produce an aggregate. This is overkill, however with small
// validator counts it's the only way to be certain there is _at least one_ aggregator per
// committee.
Self::new_with_target_aggregators(eth_spec_instance, keypairs, 1 << 32)
}
/// Instantiate a new harness with `validator_count` initial validators and a custom
/// `target_aggregators_per_committee` spec value
pub fn new_with_target_aggregators(
eth_spec_instance: E,
keypairs: Vec<Keypair>,
target_aggregators_per_committee: u64,
) -> Self {
let data_dir = tempdir().expect("should create temporary data_dir");
let spec = E::default_spec();
let mut spec = E::default_spec();
spec.target_aggregators_per_committee = target_aggregators_per_committee;
let log = NullLoggerBuilder.build().expect("logger should build");
@@ -268,9 +284,9 @@ where
.expect("should not error during block processing");
self.chain.fork_choice().expect("should find head");
head_block_root = Some(block_root);
self.add_free_attestations(&attestation_strategy, &new_state, block_root, slot);
self.add_attestations_for_slot(&attestation_strategy, &new_state, block_root, slot);
state = new_state;
slot += 1;
@@ -312,7 +328,7 @@ where
self.chain.fork_choice().expect("should find head");
let attestation_strategy = AttestationStrategy::SomeValidators(validators.to_vec());
self.add_free_attestations(&attestation_strategy, &new_state, block_root, slot);
self.add_attestations_for_slot(&attestation_strategy, &new_state, block_root, slot);
(block_root.into(), new_state)
}
@@ -448,114 +464,183 @@ where
(signed_block, state)
}
/// Adds attestations to the `BeaconChain` operations pool and fork choice.
/// A list of attestations for each committee for the given slot.
///
/// The `attestation_strategy` dictates which validators should attest.
fn add_free_attestations(
/// The first layer of the Vec is organised per committee. For example, if the return value is
/// called `all_attestations`, then all attestations in `all_attestations[0]` will be for
/// committee 0, whilst all in `all_attestations[1]` will be for committee 1.
pub fn get_unaggregated_attestations(
&self,
attestation_strategy: &AttestationStrategy,
state: &BeaconState<E>,
head_block_root: Hash256,
attestation_slot: Slot,
) -> Vec<Vec<Attestation<E>>> {
let spec = &self.spec;
let fork = &state.fork;
let attesting_validators = self.get_attesting_validators(attestation_strategy);
state
.get_beacon_committees_at_slot(state.slot)
.expect("should get committees")
.iter()
.map(|bc| {
bc.committee
.par_iter()
.enumerate()
.filter_map(|(i, validator_index)| {
if !attesting_validators.contains(validator_index) {
return None;
}
let mut attestation = self
.chain
.produce_unaggregated_attestation_for_block(
attestation_slot,
bc.index,
head_block_root,
Cow::Borrowed(state),
)
.expect("should produce attestation");
attestation
.aggregation_bits
.set(i, true)
.expect("should be able to set aggregation bits");
attestation.signature = {
let domain = spec.get_domain(
attestation.data.target.epoch,
Domain::BeaconAttester,
fork,
state.genesis_validators_root,
);
let message = attestation.data.signing_root(domain);
let mut agg_sig = AggregateSignature::new();
agg_sig.add(&Signature::new(
message.as_bytes(),
self.get_sk(*validator_index),
));
agg_sig
};
Some(attestation)
})
.collect()
})
.collect()
}
fn get_attesting_validators(&self, attestation_strategy: &AttestationStrategy) -> Vec<usize> {
match attestation_strategy {
AttestationStrategy::AllValidators => (0..self.keypairs.len()).collect(),
AttestationStrategy::SomeValidators(vec) => vec.clone(),
}
}
/// Generates a `Vec<Attestation>` for some attestation strategy and head_block.
pub fn add_attestations_for_slot(
&self,
attestation_strategy: &AttestationStrategy,
state: &BeaconState<E>,
head_block_root: Hash256,
head_block_slot: Slot,
) {
self.get_free_attestations(
// These attestations will not be accepted by the chain so no need to generate them.
if state.slot + E::slots_per_epoch() < self.chain.slot().expect("should get slot") {
return;
}
let spec = &self.spec;
let fork = &state.fork;
let attesting_validators = self.get_attesting_validators(attestation_strategy);
let unaggregated_attestations = self.get_unaggregated_attestations(
attestation_strategy,
state,
head_block_root,
head_block_slot,
)
.into_iter()
.for_each(|attestation| {
match self
.chain
.process_attestation(attestation, AttestationType::Aggregated)
.expect("should not error during attestation processing")
{
// PastEpoch can occur if we fork over several epochs
AttestationProcessingOutcome::Processed
| AttestationProcessingOutcome::PastEpoch { .. } => (),
other => panic!("did not successfully process attestation: {:?}", other),
}
});
}
);
/// Generates a `Vec<Attestation>` for some attestation strategy and head_block.
pub fn get_free_attestations(
&self,
attestation_strategy: &AttestationStrategy,
state: &BeaconState<E>,
head_block_root: Hash256,
head_block_slot: Slot,
) -> Vec<Attestation<E>> {
let spec = &self.spec;
let fork = &state.fork;
// Loop through all unaggregated attestations, submit them to the chain and also submit a
// single aggregate.
unaggregated_attestations
.into_iter()
.for_each(|committee_attestations| {
// Submit each unaggregated attestation to the chain.
for attestation in &committee_attestations {
self.chain
.verify_unaggregated_attestation_for_gossip(attestation.clone())
.expect("should not error during attestation processing")
.add_to_pool(&self.chain)
.expect("should add attestation to naive pool");
}
let attesting_validators: Vec<usize> = match attestation_strategy {
AttestationStrategy::AllValidators => (0..self.keypairs.len()).collect(),
AttestationStrategy::SomeValidators(vec) => vec.clone(),
};
// If there are any attestations in this committee, create an aggregate.
if let Some(attestation) = committee_attestations.first() {
let bc = state.get_beacon_committee(attestation.data.slot, attestation.data.index)
.expect("should get committee");
let mut attestations = vec![];
let aggregator_index = bc.committee
.iter()
.find(|&validator_index| {
if !attesting_validators.contains(validator_index) {
return false
}
state
.get_beacon_committees_at_slot(state.slot)
.expect("should get committees")
.iter()
.for_each(|bc| {
let mut local_attestations: Vec<Attestation<E>> = bc
.committee
.par_iter()
.enumerate()
.filter_map(|(i, validator_index)| {
// Note: searching this array is worst-case `O(n)`. A hashset could be a better
// alternative.
if attesting_validators.contains(validator_index) {
let mut attestation = self
.chain
.produce_attestation_for_block(
head_block_slot,
bc.index,
head_block_root,
Cow::Borrowed(state),
)
.expect("should produce attestation");
let selection_proof = SelectionProof::new::<E>(
state.slot,
self.get_sk(*validator_index),
fork,
state.genesis_validators_root,
spec,
);
attestation
.aggregation_bits
.set(i, true)
.expect("should be able to set aggregation bits");
selection_proof.is_aggregator(bc.committee.len(), spec).unwrap_or(false)
})
.copied()
.expect(&format!(
"Committee {} at slot {} with {} attesting validators does not have any aggregators",
bc.index, state.slot, bc.committee.len()
));
attestation.signature = {
let domain = spec.get_domain(
attestation.data.target.epoch,
Domain::BeaconAttester,
fork,
state.genesis_validators_root,
);
// If the chain is able to produce an aggregate, use that. Otherwise, build an
// aggregate locally.
let aggregate = self
.chain
.get_aggregated_attestation(&attestation.data)
.expect("should not error whilst finding aggregate")
.unwrap_or_else(|| {
committee_attestations.iter().skip(1).fold(attestation.clone(), |mut agg, att| {
agg.aggregate(att);
agg
})
});
let message = attestation.data.signing_root(domain);
let signed_aggregate = SignedAggregateAndProof::from_aggregate(
aggregator_index as u64,
aggregate,
None,
self.get_sk(aggregator_index),
fork,
state.genesis_validators_root,
spec,
);
let mut agg_sig = AggregateSignature::new();
agg_sig.add(&Signature::new(
message.as_bytes(),
self.get_sk(*validator_index),
));
agg_sig
};
Some(attestation)
} else {
None
}
})
.collect();
attestations.append(&mut local_attestations);
self.chain
.verify_aggregated_attestation_for_gossip(signed_aggregate)
.expect("should not error during attestation processing")
.add_to_pool(&self.chain)
.expect("should add attestation to naive aggregation pool")
.add_to_fork_choice(&self.chain)
.expect("should add attestation to fork choice");
}
});
attestations
}
/// Creates two forks:

View File

@@ -115,6 +115,11 @@ impl ValidatorPubkeyCache {
pub fn get_index(&self, pubkey: &PublicKeyBytes) -> Option<usize> {
self.indices.get(pubkey).copied()
}
/// Returns the number of validators in the cache.
pub fn len(&self) -> usize {
self.indices.len()
}
}
/// Allows for maintaining an on-disk copy of the `ValidatorPubkeyCache`. The file is raw SSZ bytes