diff --git a/beacon_node/beacon_chain/src/attestation_verification.rs b/beacon_node/beacon_chain/src/attestation_verification.rs index 2e29886d98..a6233cde4b 100644 --- a/beacon_node/beacon_chain/src/attestation_verification.rs +++ b/beacon_node/beacon_chain/src/attestation_verification.rs @@ -685,7 +685,7 @@ impl VerifiedUnaggregatedAttestation { * for the slot, attestation.data.slot. */ if chain - .observed_attesters + .observed_gossip_attesters .read() .validator_has_been_observed(attestation.data.target.epoch, validator_index as usize) .map_err(BeaconChainError::from)? @@ -712,7 +712,7 @@ impl VerifiedUnaggregatedAttestation { // there can be a race-condition if we receive two attestations at the same time and // process them in different threads. if chain - .observed_attesters + .observed_gossip_attesters .write() .observe_validator(attestation.data.target.epoch, validator_index as usize) .map_err(BeaconChainError::from)? diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 5c42a515cc..5d5741226c 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -245,13 +245,17 @@ pub struct BeaconChain { pub(crate) observed_attestations: RwLock>, /// Contains a store of sync contributions which have been observed by the beacon chain. pub(crate) observed_sync_contributions: RwLock>, - /// Maintains a record of which validators have been seen to attest in recent epochs. - pub(crate) observed_attesters: RwLock>, + /// Maintains a record of which validators have been seen to publish gossip attestations in + /// recent epochs. + pub observed_gossip_attesters: RwLock>, + /// Maintains a record of which validators have been seen to have attestations included in + /// blocks in recent epochs. + pub observed_block_attesters: RwLock>, /// Maintains a record of which validators have been seen sending sync messages in recent epochs. pub(crate) observed_sync_contributors: RwLock>, /// Maintains a record of which validators have been seen to create `SignedAggregateAndProofs` /// in recent epochs. - pub(crate) observed_aggregators: RwLock>, + pub observed_aggregators: RwLock>, /// Maintains a record of which validators have been seen to create `SignedContributionAndProofs` /// in recent epochs. pub(crate) observed_sync_aggregators: RwLock>, @@ -2337,6 +2341,7 @@ impl BeaconChain { for attestation in block.body().attestations() { let _fork_choice_attestation_timer = metrics::start_timer(&metrics::FORK_CHOICE_PROCESS_ATTESTATION_TIMES); + let attestation_target_epoch = attestation.data.target.epoch; let committee = state.get_beacon_committee(attestation.data.slot, attestation.data.index)?; @@ -2351,6 +2356,25 @@ impl BeaconChain { Err(e) => Err(BlockError::BeaconChainError(e.into())), }?; + // To avoid slowing down sync, only register attestations for the + // `observed_block_attesters` if they are from the previous epoch or later. + if attestation_target_epoch + 1 >= current_epoch { + let mut observed_block_attesters = self.observed_block_attesters.write(); + for &validator_index in &indexed_attestation.attesting_indices { + if let Err(e) = observed_block_attesters + .observe_validator(attestation_target_epoch, validator_index as usize) + { + debug!( + self.log, + "Failed to register observed block attester"; + "error" => ?e, + "epoch" => attestation_target_epoch, + "validator_index" => validator_index, + ) + } + } + } + // Only register this with the validator monitor when the block is sufficiently close to // the current slot. if VALIDATOR_MONITOR_HISTORIC_EPOCHS as u64 * T::EthSpec::slots_per_epoch() @@ -3521,8 +3545,12 @@ impl BeaconChain { // It's necessary to assign these checks to intermediate variables to avoid a deadlock. // // See: https://github.com/sigp/lighthouse/pull/2230#discussion_r620013993 - let attested = self - .observed_attesters + let gossip_attested = self + .observed_gossip_attesters + .read() + .index_seen_at_epoch(validator_index, epoch); + let block_attested = self + .observed_block_attesters .read() .index_seen_at_epoch(validator_index, epoch); let aggregated = self @@ -3534,7 +3562,7 @@ impl BeaconChain { .read() .index_seen_at_epoch(validator_index as u64, epoch); - attested || aggregated || produced_block + gossip_attested || block_attested || aggregated || produced_block } } diff --git a/beacon_node/beacon_chain/src/builder.rs b/beacon_node/beacon_chain/src/builder.rs index d18a605964..d6524cabeb 100644 --- a/beacon_node/beacon_chain/src/builder.rs +++ b/beacon_node/beacon_chain/src/builder.rs @@ -519,7 +519,9 @@ where // TODO: allow for persisting and loading the pool from disk. observed_sync_contributions: <_>::default(), // TODO: allow for persisting and loading the pool from disk. - observed_attesters: <_>::default(), + observed_gossip_attesters: <_>::default(), + // TODO: allow for persisting and loading the pool from disk. + observed_block_attesters: <_>::default(), // TODO: allow for persisting and loading the pool from disk. observed_sync_contributors: <_>::default(), // TODO: allow for persisting and loading the pool from disk. diff --git a/beacon_node/beacon_chain/src/metrics.rs b/beacon_node/beacon_chain/src/metrics.rs index 9791698ac9..6b27dfcfcc 100644 --- a/beacon_node/beacon_chain/src/metrics.rs +++ b/beacon_node/beacon_chain/src/metrics.rs @@ -848,7 +848,7 @@ fn scrape_attestation_observation(slot_now: Slot, chain: &B let prev_epoch = slot_now.epoch(T::EthSpec::slots_per_epoch()) - 1; if let Some(count) = chain - .observed_attesters + .observed_gossip_attesters .read() .observed_validator_count(prev_epoch) { diff --git a/beacon_node/beacon_chain/src/observed_attesters.rs b/beacon_node/beacon_chain/src/observed_attesters.rs index 043105992d..ed22beaec6 100644 --- a/beacon_node/beacon_chain/src/observed_attesters.rs +++ b/beacon_node/beacon_chain/src/observed_attesters.rs @@ -22,6 +22,21 @@ use std::marker::PhantomData; use types::slot_data::SlotData; use types::{Epoch, EthSpec, Slot, Unsigned}; +/// The maximum capacity of the `AutoPruningEpochContainer`. +/// +/// Fits the next, current and previous epochs. We require the next epoch due to the +/// `MAXIMUM_GOSSIP_CLOCK_DISPARITY`. We require the previous epoch since the specification +/// declares: +/// +/// ```ignore +/// aggregate.data.slot + ATTESTATION_PROPAGATION_SLOT_RANGE +/// >= current_slot >= aggregate.data.slot +/// ``` +/// +/// This means that during the current epoch we will always accept an attestation +/// from at least one slot in the previous epoch. +pub const MAX_CACHED_EPOCHS: u64 = 3; + pub type ObservedAttesters = AutoPruningEpochContainer; pub type ObservedSyncContributors = AutoPruningSlotContainer, E>; @@ -347,18 +362,7 @@ impl AutoPruningEpochContainer { /// The maximum number of epochs stored in `self`. fn max_capacity(&self) -> u64 { - // The next, current and previous epochs. We require the next epoch due to the - // `MAXIMUM_GOSSIP_CLOCK_DISPARITY`. We require the previous epoch since the - // specification delcares: - // - // ``` - // aggregate.data.slot + ATTESTATION_PROPAGATION_SLOT_RANGE - // >= current_slot >= aggregate.data.slot - // ``` - // - // This means that during the current epoch we will always accept an attestation - // from at least one slot in the previous epoch. - 3 + MAX_CACHED_EPOCHS } /// Updates `self` with the current epoch, removing all attestations that become expired diff --git a/beacon_node/beacon_chain/tests/attestation_verification.rs b/beacon_node/beacon_chain/tests/attestation_verification.rs index 101823b8fa..30cde98d64 100644 --- a/beacon_node/beacon_chain/tests/attestation_verification.rs +++ b/beacon_node/beacon_chain/tests/attestation_verification.rs @@ -963,3 +963,114 @@ fn attestation_that_skips_epochs() { .verify_unaggregated_attestation_for_gossip(attestation, Some(subnet_id)) .expect("should gossip verify attestation that skips slots"); } + +#[test] +fn verify_aggregate_for_gossip_doppelganger_detection() { + let harness = get_harness(VALIDATOR_COUNT); + + // Extend the chain out a few epochs so we have some chain depth to play with. + harness.extend_chain( + MainnetEthSpec::slots_per_epoch() as usize * 3 - 1, + BlockStrategy::OnCanonicalHead, + AttestationStrategy::AllValidators, + ); + + // Advance into a slot where there have not been blocks or attestations produced. + harness.advance_slot(); + + let current_slot = harness.chain.slot().expect("should get slot"); + + assert_eq!( + current_slot % E::slots_per_epoch(), + 0, + "the test requires a new epoch to avoid already-seen errors" + ); + + let (valid_attestation, _attester_index, _attester_committee_index, _, _) = + get_valid_unaggregated_attestation(&harness.chain); + let (valid_aggregate, _, _) = + get_valid_aggregated_attestation(&harness.chain, valid_attestation); + + harness + .chain + .verify_aggregated_attestation_for_gossip(valid_aggregate.clone()) + .expect("should verify aggregate attestation"); + + let epoch = valid_aggregate.message.aggregate.data.target.epoch; + let index = valid_aggregate.message.aggregator_index as usize; + assert!(harness.chain.validator_seen_at_epoch(index, epoch)); + + // Check the correct beacon cache is populated + assert!(!harness + .chain + .observed_block_attesters + .read() + .validator_has_been_observed(epoch, index) + .expect("should check if block attester was observed")); + assert!(!harness + .chain + .observed_gossip_attesters + .read() + .validator_has_been_observed(epoch, index) + .expect("should check if gossip attester was observed")); + assert!(harness + .chain + .observed_aggregators + .read() + .validator_has_been_observed(epoch, index) + .expect("should check if gossip aggregator was observed")); +} + +#[test] +fn verify_attestation_for_gossip_doppelganger_detection() { + let harness = get_harness(VALIDATOR_COUNT); + + // Extend the chain out a few epochs so we have some chain depth to play with. + harness.extend_chain( + MainnetEthSpec::slots_per_epoch() as usize * 3 - 1, + BlockStrategy::OnCanonicalHead, + AttestationStrategy::AllValidators, + ); + + // Advance into a slot where there have not been blocks or attestations produced. + harness.advance_slot(); + + let current_slot = harness.chain.slot().expect("should get slot"); + + assert_eq!( + current_slot % E::slots_per_epoch(), + 0, + "the test requires a new epoch to avoid already-seen errors" + ); + + let (valid_attestation, index, _attester_committee_index, _, subnet_id) = + get_valid_unaggregated_attestation(&harness.chain); + + harness + .chain + .verify_unaggregated_attestation_for_gossip(valid_attestation.clone(), Some(subnet_id)) + .expect("should verify attestation"); + + let epoch = valid_attestation.data.target.epoch; + assert!(harness.chain.validator_seen_at_epoch(index, epoch)); + + // Check the correct beacon cache is populated + assert!(!harness + .chain + .observed_block_attesters + .read() + .validator_has_been_observed(epoch, index) + .expect("should check if block attester was observed")); + assert!(harness + .chain + .observed_gossip_attesters + .read() + .validator_has_been_observed(epoch, index) + .expect("should check if gossip attester was observed")); + assert!(!harness + .chain + .observed_aggregators + .read() + .validator_has_been_observed(epoch, index) + .expect("should check if gossip aggregator was observed")); +} diff --git a/beacon_node/beacon_chain/tests/block_verification.rs b/beacon_node/beacon_chain/tests/block_verification.rs index ccdb98a231..e02b80231e 100644 --- a/beacon_node/beacon_chain/tests/block_verification.rs +++ b/beacon_node/beacon_chain/tests/block_verification.rs @@ -9,6 +9,7 @@ use beacon_chain::test_utils::{ use beacon_chain::{BeaconSnapshot, BlockError, ChainConfig, ChainSegmentResult}; use slasher::{Config as SlasherConfig, Slasher}; use state_processing::{ + common::get_indexed_attestation, per_block_processing::{per_block_processing, BlockSignatureStrategy}, per_slot_processing, BlockProcessingError, }; @@ -868,6 +869,52 @@ fn verify_block_for_gossip_slashing_detection() { slasher_dir.close().unwrap(); } +#[test] +fn verify_block_for_gossip_doppelganger_detection() { + let harness = get_harness(VALIDATOR_COUNT); + + let state = harness.get_current_state(); + let (block, _) = harness.make_block(state.clone(), Slot::new(1)); + + let verified_block = harness.chain.verify_block_for_gossip(block).unwrap(); + let attestations = verified_block.block.message().body().attestations().clone(); + harness.chain.process_block(verified_block).unwrap(); + + for att in attestations.iter() { + let epoch = att.data.target.epoch; + let committee = state + .get_beacon_committee(att.data.slot, att.data.index) + .unwrap(); + let indexed_attestation = get_indexed_attestation(committee.committee, att).unwrap(); + + for &index in &indexed_attestation.attesting_indices { + let index = index as usize; + + assert!(harness.chain.validator_seen_at_epoch(index, epoch)); + + // Check the correct beacon cache is populated + assert!(harness + .chain + .observed_block_attesters + .read() + .validator_has_been_observed(epoch, index) + .expect("should check if block attester was observed")); + assert!(!harness + .chain + .observed_gossip_attesters + .read() + .validator_has_been_observed(epoch, index) + .expect("should check if gossip attester was observed")); + assert!(!harness + .chain + .observed_aggregators + .read() + .validator_has_been_observed(epoch, index) + .expect("should check if gossip aggregator was observed")); + } + } +} + #[test] fn add_base_block_to_altair_chain() { let mut spec = MainnetEthSpec::default_spec();