From 3199b1a6f210660694458514756ccdda96e71642 Mon Sep 17 00:00:00 2001 From: Pawan Dhananjay Date: Thu, 18 Jun 2020 14:41:03 +0530 Subject: [PATCH] Use all attestation subnets (#1257) * Update `milagro_bls` to new release (#1183) * Update milagro_bls to new release Signed-off-by: Kirk Baird * Tidy up fake cryptos Signed-off-by: Kirk Baird * move SecretHash to bls and put plaintext back Signed-off-by: Kirk Baird * Update v0.12.0 to v0.12.1 * Add compute_subnet_for_attestation * Replace CommitteeIndex topic with Attestation * Fix warnings * Fix attestation service tests * fmt * Appease clippy * return error from validator_subscriptions * move state out of loop * Fix early break on error * Get state from slot clock * Fix beacon state in attestation tests * Add failing test for lookahead > 1 * Minor change * Address some review comments * Add subnet verification to beacon chain * Move subnet verification to processor * Pass committee_count_at_slot to ValidatorDuty and ValidatorSubscription * Pass subnet id for publishing attestations * Fix attestation service tests * Fix more tests * Fix fork choice test * Remove unused code * Remove more unused and expensive code Co-authored-by: Kirk Baird Co-authored-by: Michael Sproul Co-authored-by: Age Manning Co-authored-by: Paul Hauner --- .../src/attestation_verification.rs | 92 +++++++++++------ beacon_node/beacon_chain/src/beacon_chain.rs | 3 +- beacon_node/beacon_chain/src/test_utils.rs | 25 +++-- .../tests/attestation_verification.rs | 65 ++++++++++-- beacon_node/beacon_chain/tests/store_tests.rs | 4 +- beacon_node/beacon_chain/tests/tests.rs | 4 +- beacon_node/eth2-libp2p/src/types/pubsub.rs | 4 +- beacon_node/eth2-libp2p/src/types/topics.rs | 30 ++---- .../network/src/attestation_service/mod.rs | 65 ++++++------ .../src/attestation_service/tests/mod.rs | 99 +++++++++++++++---- beacon_node/network/src/router/mod.rs | 1 + beacon_node/network/src/router/processor.rs | 17 +++- beacon_node/network/src/service.rs | 11 +-- beacon_node/rest_api/src/validator.rs | 54 ++++++---- beacon_node/rest_api/tests/test.rs | 19 +++- common/remote_beacon_node/src/lib.rs | 4 +- common/rest_types/src/validator.rs | 4 + consensus/fork_choice/tests/tests.rs | 13 ++- consensus/types/src/attestation.rs | 17 +--- consensus/types/src/beacon_block.rs | 4 +- consensus/types/src/subnet_id.rs | 51 +++++++++- validator_client/src/attestation_service.rs | 52 ++++++---- validator_client/src/duties_service.rs | 5 +- 23 files changed, 444 insertions(+), 199 deletions(-) diff --git a/beacon_node/beacon_chain/src/attestation_verification.rs b/beacon_node/beacon_chain/src/attestation_verification.rs index 43e53b51b3..49fac92f3a 100644 --- a/beacon_node/beacon_chain/src/attestation_verification.rs +++ b/beacon_node/beacon_chain/src/attestation_verification.rs @@ -52,7 +52,7 @@ use std::borrow::Cow; use tree_hash::TreeHash; use types::{ Attestation, BeaconCommittee, CommitteeIndex, Epoch, EthSpec, Hash256, IndexedAttestation, - RelativeEpoch, SelectionProof, SignedAggregateAndProof, Slot, + RelativeEpoch, SelectionProof, SignedAggregateAndProof, Slot, SubnetId, }; /// Returned when an attestation was not successfully verified. It might not have been verified for @@ -123,6 +123,11 @@ pub enum Error { /// The attestation is attesting to a state that is later than itself. (Viz., attesting to the /// future). AttestsToFutureBlock { block: Slot, attestation: Slot }, + /// The attestation was received on an invalid attestation subnet. + InvalidSubnetId { + received: SubnetId, + expected: SubnetId, + }, /// The attestation failed the `state_processing` verification stage. Invalid(AttestationValidationError), /// There was an error whilst processing the attestation. It is not known if it is valid or invalid. @@ -234,28 +239,29 @@ impl VerifiedAggregatedAttestation { return Err(Error::EmptyAggregationBitfield); } - let indexed_attestation = map_attestation_committee(chain, attestation, |committee| { - // Note: this clones the signature which is known to be a relatively slow operation. - // - // Future optimizations should remove this clone. - let selection_proof = - SelectionProof::from(signed_aggregate.message.selection_proof.clone()); + let indexed_attestation = + map_attestation_committee(chain, attestation, |(committee, _)| { + // Note: this clones the signature which is known to be a relatively slow operation. + // + // Future optimizations should remove this clone. + let selection_proof = + SelectionProof::from(signed_aggregate.message.selection_proof.clone()); - if !selection_proof - .is_aggregator(committee.committee.len(), &chain.spec) - .map_err(|e| Error::BeaconChainError(e.into()))? - { - return Err(Error::InvalidSelectionProof { aggregator_index }); - } + if !selection_proof + .is_aggregator(committee.committee.len(), &chain.spec) + .map_err(|e| Error::BeaconChainError(e.into()))? + { + return Err(Error::InvalidSelectionProof { aggregator_index }); + } - // Ensure the aggregator is a member of the committee for which it is aggregating. - if !committee.committee.contains(&(aggregator_index as usize)) { - return Err(Error::AggregatorNotInCommittee { aggregator_index }); - } + // Ensure the aggregator is a member of the committee for which it is aggregating. + if !committee.committee.contains(&(aggregator_index as usize)) { + return Err(Error::AggregatorNotInCommittee { aggregator_index }); + } - get_indexed_attestation(committee.committee, &attestation) - .map_err(|e| BeaconChainError::from(e).into()) - })?; + get_indexed_attestation(committee.committee, &attestation) + .map_err(|e| BeaconChainError::from(e).into()) + })?; // Ensure that all signatures are valid. if !verify_signed_aggregate_signatures(chain, &signed_aggregate, &indexed_attestation)? { @@ -309,8 +315,12 @@ impl VerifiedAggregatedAttestation { impl VerifiedUnaggregatedAttestation { /// Returns `Ok(Self)` if the `attestation` is valid to be (re)published on the gossip /// network. + /// + /// `subnet_id` is the subnet from which we received this attestation. This function will + /// verify that it was received on the correct subnet. pub fn verify( attestation: Attestation, + subnet_id: SubnetId, chain: &BeaconChain, ) -> Result { // Ensure attestation is within the last ATTESTATION_PROPAGATION_SLOT_RANGE slots (within a @@ -330,7 +340,23 @@ impl VerifiedUnaggregatedAttestation { // attestation and do not delay consideration for later. verify_head_block_is_known(chain, &attestation)?; - let indexed_attestation = obtain_indexed_attestation(chain, &attestation)?; + let (indexed_attestation, committees_per_slot) = + obtain_indexed_attestation_and_committees_per_slot(chain, &attestation)?; + + let expected_subnet_id = SubnetId::compute_subnet_for_attestation_data::( + &indexed_attestation.data, + committees_per_slot, + &chain.spec, + ) + .map_err(BeaconChainError::from)?; + + // Ensure the attestation is from the correct subnet. + if subnet_id != expected_subnet_id { + return Err(Error::InvalidSubnetId { + received: subnet_id, + expected: expected_subnet_id, + }); + } let validator_index = *indexed_attestation .attesting_indices @@ -566,19 +592,23 @@ pub fn verify_signed_aggregate_signatures( Ok(verify_signature_sets(signature_sets)) } -/// Returns the `indexed_attestation` for the `attestation` using the public keys cached in the -/// `chain`. -pub fn obtain_indexed_attestation( +/// Assists in readability. +type CommitteesPerSlot = u64; + +/// Returns the `indexed_attestation` and committee count per slot for the `attestation` using the +/// public keys cached in the `chain`. +pub fn obtain_indexed_attestation_and_committees_per_slot( chain: &BeaconChain, attestation: &Attestation, -) -> Result, Error> { - map_attestation_committee(chain, attestation, |committee| { +) -> Result<(IndexedAttestation, CommitteesPerSlot), Error> { + map_attestation_committee(chain, attestation, |(committee, committees_per_slot)| { get_indexed_attestation(committee.committee, &attestation) + .map(|attestation| (attestation, committees_per_slot)) .map_err(|e| BeaconChainError::from(e).into()) }) } -/// Runs the `map_fn` with the committee for the given `attestation`. +/// Runs the `map_fn` with the committee and committee count per slot for the given `attestation`. /// /// This function exists in this odd "map" pattern because efficiently obtaining the committee for /// an attestation can be complex. It might involve reading straight from the @@ -594,7 +624,7 @@ pub fn map_attestation_committee<'a, T, F, R>( ) -> Result where T: BeaconChainTypes, - F: Fn(BeaconCommittee) -> Result, + F: Fn((BeaconCommittee, CommitteesPerSlot)) -> Result, { let attestation_epoch = attestation.data.slot.epoch(T::EthSpec::slots_per_epoch()); let target = &attestation.data.target; @@ -624,9 +654,10 @@ where metrics::stop_timer(cache_wait_timer); if let Some(committee_cache) = shuffling_cache.get(attestation_epoch, target.root) { + let committees_per_slot = committee_cache.committees_per_slot(); committee_cache .get_beacon_committee(attestation.data.slot, attestation.data.index) - .map(map_fn) + .map(|committee| map_fn((committee, committees_per_slot))) .unwrap_or_else(|| { Err(Error::NoCommitteeForSlotAndIndex { slot: attestation.data.slot, @@ -689,9 +720,10 @@ where metrics::stop_timer(committee_building_timer); + let committees_per_slot = committee_cache.committees_per_slot(); committee_cache .get_beacon_committee(attestation.data.slot, attestation.data.index) - .map(map_fn) + .map(|committee| map_fn((committee, committees_per_slot))) .unwrap_or_else(|| { Err(Error::NoCommitteeForSlotAndIndex { slot: attestation.data.slot, diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index c7e93b2b2c..cbdfa81746 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -872,12 +872,13 @@ impl BeaconChain { pub fn verify_unaggregated_attestation_for_gossip( &self, attestation: Attestation, + subnet_id: SubnetId, ) -> Result, AttestationError> { metrics::inc_counter(&metrics::UNAGGREGATED_ATTESTATION_PROCESSING_REQUESTS); let _timer = metrics::start_timer(&metrics::UNAGGREGATED_ATTESTATION_GOSSIP_VERIFICATION_TIMES); - VerifiedUnaggregatedAttestation::verify(attestation, self).map(|v| { + VerifiedUnaggregatedAttestation::verify(attestation, subnet_id, self).map(|v| { metrics::inc_counter(&metrics::UNAGGREGATED_ATTESTATION_PROCESSING_SUCCESSES); v }) diff --git a/beacon_node/beacon_chain/src/test_utils.rs b/beacon_node/beacon_chain/src/test_utils.rs index 660eaefd98..e98c21051b 100644 --- a/beacon_node/beacon_chain/src/test_utils.rs +++ b/beacon_node/beacon_chain/src/test_utils.rs @@ -24,7 +24,7 @@ use tree_hash::TreeHash; use types::{ AggregateSignature, Attestation, BeaconState, BeaconStateHash, ChainSpec, Domain, EthSpec, Hash256, Keypair, SecretKey, SelectionProof, Signature, SignedAggregateAndProof, - SignedBeaconBlock, SignedBeaconBlockHash, SignedRoot, Slot, + SignedBeaconBlock, SignedBeaconBlockHash, SignedRoot, Slot, SubnetId, }; pub use types::test_utils::generate_deterministic_keypairs; @@ -536,12 +536,16 @@ where state: &BeaconState, head_block_root: Hash256, attestation_slot: Slot, - ) -> Vec>> { + ) -> Vec, SubnetId)>> { let spec = &self.spec; let fork = &state.fork; let attesting_validators = self.get_attesting_validators(attestation_strategy); + let committee_count = state + .get_committee_count_at_slot(state.slot) + .expect("should get committee count"); + state .get_beacon_committees_at_slot(state.slot) .expect("should get committees") @@ -589,7 +593,14 @@ where agg_sig }; - Some(attestation) + let subnet_id = SubnetId::compute_subnet_for_attestation_data::( + &attestation.data, + committee_count, + &self.chain.spec, + ) + .expect("should get subnet_id"); + + Some((attestation, subnet_id)) }) .collect() }) @@ -634,16 +645,16 @@ where .into_iter() .for_each(|committee_attestations| { // Submit each unaggregated attestation to the chain. - for attestation in &committee_attestations { + for (attestation, subnet_id) in &committee_attestations { self.chain - .verify_unaggregated_attestation_for_gossip(attestation.clone()) + .verify_unaggregated_attestation_for_gossip(attestation.clone(), *subnet_id) .expect("should not error during attestation processing") .add_to_pool(&self.chain) .expect("should add attestation to naive pool"); } // If there are any attestations in this committee, create an aggregate. - if let Some(attestation) = committee_attestations.first() { + if let Some((attestation, _)) = committee_attestations.first() { let bc = state.get_beacon_committee(attestation.data.slot, attestation.data.index) .expect("should get committee"); @@ -677,7 +688,7 @@ where .get_aggregated_attestation(&attestation.data) .expect("should not error whilst finding aggregate") .unwrap_or_else(|| { - committee_attestations.iter().skip(1).fold(attestation.clone(), |mut agg, att| { + committee_attestations.iter().skip(1).fold(attestation.clone(), |mut agg, (att, _)| { agg.aggregate(att); agg }) diff --git a/beacon_node/beacon_chain/tests/attestation_verification.rs b/beacon_node/beacon_chain/tests/attestation_verification.rs index 532736670c..370925e736 100644 --- a/beacon_node/beacon_chain/tests/attestation_verification.rs +++ b/beacon_node/beacon_chain/tests/attestation_verification.rs @@ -14,7 +14,7 @@ use tree_hash::TreeHash; use types::{ test_utils::generate_deterministic_keypair, AggregateSignature, Attestation, EthSpec, Hash256, Keypair, MainnetEthSpec, SecretKey, SelectionProof, Signature, SignedAggregateAndProof, - SignedBeaconBlock, Unsigned, + SignedBeaconBlock, SubnetId, Unsigned, }; pub type E = MainnetEthSpec; @@ -49,7 +49,7 @@ fn get_harness(validator_count: usize) -> BeaconChainHarness> { /// Also returns some info about who created it. fn get_valid_unaggregated_attestation( chain: &BeaconChain, -) -> (Attestation, usize, usize, SecretKey) { +) -> (Attestation, usize, usize, SecretKey, SubnetId) { let head = chain.head().expect("should get head"); let current_slot = chain.slot().expect("should get slot"); @@ -78,11 +78,21 @@ fn get_valid_unaggregated_attestation( ) .expect("should sign attestation"); + let subnet_id = SubnetId::compute_subnet_for_attestation_data::( + &valid_attestation.data, + head.beacon_state + .get_committee_count_at_slot(current_slot) + .expect("should get committee count"), + &chain.spec, + ) + .expect("should get subnet_id"); + ( valid_attestation, validator_index, validator_committee_index, validator_sk, + subnet_id, ) } @@ -194,7 +204,7 @@ fn aggregated_gossip_verification() { "the test requires a new epoch to avoid already-seen errors" ); - let (valid_attestation, _attester_index, _attester_committee_index, validator_sk) = + let (valid_attestation, _attester_index, _attester_committee_index, validator_sk, _subnet_id) = get_valid_unaggregated_attestation(&harness.chain); let (valid_aggregate, aggregator_index, aggregator_sk) = get_valid_aggregated_attestation(&harness.chain, valid_attestation); @@ -541,16 +551,21 @@ fn unaggregated_gossip_verification() { "the test requires a new epoch to avoid already-seen errors" ); - let (valid_attestation, expected_validator_index, validator_committee_index, validator_sk) = - get_valid_unaggregated_attestation(&harness.chain); + let ( + valid_attestation, + expected_validator_index, + validator_committee_index, + validator_sk, + subnet_id, + ) = get_valid_unaggregated_attestation(&harness.chain); macro_rules! assert_invalid { - ($desc: tt, $attn_getter: expr, $($error: pat) |+ $( if $guard: expr )?) => { + ($desc: tt, $attn_getter: expr, $subnet_getter: expr, $($error: pat) |+ $( if $guard: expr )?) => { assert!( matches!( harness .chain - .verify_unaggregated_attestation_for_gossip($attn_getter) + .verify_unaggregated_attestation_for_gossip($attn_getter, $subnet_getter) .err() .expect(&format!( "{} should error during verify_unaggregated_attestation_for_gossip", @@ -564,6 +579,29 @@ fn unaggregated_gossip_verification() { }; } + /* + * The following test ensures: + * + * Spec v0.12.1 + * + * The attestation is for the correct subnet (i.e. compute_subnet_for_attestation(state, + * attestation.data.slot, attestation.data.index) == subnet_id). + */ + let id: u64 = subnet_id.into(); + let invalid_subnet_id = SubnetId::new(id + 1); + assert_invalid!( + "attestation from future slot", + { + valid_attestation.clone() + }, + invalid_subnet_id, + AttnError::InvalidSubnetId { + received, + expected, + } + if received == invalid_subnet_id && expected == subnet_id + ); + /* * The following two tests ensure: * @@ -583,6 +621,7 @@ fn unaggregated_gossip_verification() { a.data.slot = future_slot; a }, + subnet_id, AttnError::FutureSlot { attestation_slot, latest_permissible_slot, @@ -602,6 +641,7 @@ fn unaggregated_gossip_verification() { a.data.slot = early_slot; a }, + subnet_id, AttnError::PastSlot { attestation_slot, // Subtract an additional slot since the harness will be exactly on the start of the @@ -634,6 +674,7 @@ fn unaggregated_gossip_verification() { ); a }, + subnet_id, AttnError::NotExactlyOneAggregationBitSet(0) ); @@ -646,6 +687,7 @@ fn unaggregated_gossip_verification() { .expect("should set second aggregation bit"); a }, + subnet_id, AttnError::NotExactlyOneAggregationBitSet(2) ); @@ -665,6 +707,7 @@ fn unaggregated_gossip_verification() { a.data.beacon_block_root = unknown_root; a }, + subnet_id, AttnError::UnknownHeadBlock { beacon_block_root, } @@ -690,13 +733,14 @@ fn unaggregated_gossip_verification() { a }, + subnet_id, AttnError::InvalidSignature ); assert!( harness .chain - .verify_unaggregated_attestation_for_gossip(valid_attestation.clone()) + .verify_unaggregated_attestation_for_gossip(valid_attestation.clone(), subnet_id) .is_ok(), "valid attestation should be verified" ); @@ -714,6 +758,7 @@ fn unaggregated_gossip_verification() { assert_invalid!( "attestation that has already been seen", valid_attestation.clone(), + subnet_id, AttnError::PriorAttestationKnown { validator_index, epoch, @@ -755,7 +800,7 @@ fn attestation_that_skips_epochs() { per_slot_processing(&mut state, None, &harness.spec).expect("should process slot"); } - let attestation = harness + let (attestation, subnet_id) = harness .get_unaggregated_attestations( &AttestationStrategy::AllValidators, &state, @@ -785,6 +830,6 @@ fn attestation_that_skips_epochs() { harness .chain - .verify_unaggregated_attestation_for_gossip(attestation) + .verify_unaggregated_attestation_for_gossip(attestation, subnet_id) .expect("should gossip verify attestation that skips slots"); } diff --git a/beacon_node/beacon_chain/tests/store_tests.rs b/beacon_node/beacon_chain/tests/store_tests.rs index 89a1837923..c5207b3216 100644 --- a/beacon_node/beacon_chain/tests/store_tests.rs +++ b/beacon_node/beacon_chain/tests/store_tests.rs @@ -293,7 +293,7 @@ fn epoch_boundary_state_attestation_processing() { let mut checked_pre_fin = false; - for attestation in late_attestations.into_iter().flatten() { + for (attestation, subnet_id) in late_attestations.into_iter().flatten() { // load_epoch_boundary_state is idempotent! let block_root = attestation.data.beacon_block_root; let block = store.get_block(&block_root).unwrap().expect("block exists"); @@ -317,7 +317,7 @@ fn epoch_boundary_state_attestation_processing() { let res = harness .chain - .verify_unaggregated_attestation_for_gossip(attestation.clone()); + .verify_unaggregated_attestation_for_gossip(attestation.clone(), subnet_id); let current_slot = harness.chain.slot().expect("should get slot"); let expected_attestation_slot = attestation.data.slot; diff --git a/beacon_node/beacon_chain/tests/tests.rs b/beacon_node/beacon_chain/tests/tests.rs index 8015871c7b..6ea9b5bd97 100644 --- a/beacon_node/beacon_chain/tests/tests.rs +++ b/beacon_node/beacon_chain/tests/tests.rs @@ -459,10 +459,10 @@ fn attestations_with_increasing_slots() { harness.advance_slot(); } - for attestation in attestations.into_iter().flatten() { + for (attestation, subnet_id) in attestations.into_iter().flatten() { let res = harness .chain - .verify_unaggregated_attestation_for_gossip(attestation.clone()); + .verify_unaggregated_attestation_for_gossip(attestation.clone(), subnet_id); let current_slot = harness.chain.slot().expect("should get slot"); let expected_attestation_slot = attestation.data.slot; diff --git a/beacon_node/eth2-libp2p/src/types/pubsub.rs b/beacon_node/eth2-libp2p/src/types/pubsub.rs index e9cef46c67..93874508bb 100644 --- a/beacon_node/eth2-libp2p/src/types/pubsub.rs +++ b/beacon_node/eth2-libp2p/src/types/pubsub.rs @@ -41,7 +41,7 @@ impl PubsubMessage { PubsubMessage::BeaconBlock(_) => GossipKind::BeaconBlock, PubsubMessage::AggregateAndProofAttestation(_) => GossipKind::BeaconAggregateAndProof, PubsubMessage::Attestation(attestation_data) => { - GossipKind::CommitteeIndex(attestation_data.0) + GossipKind::Attestation(attestation_data.0) } PubsubMessage::VoluntaryExit(_) => GossipKind::VoluntaryExit, PubsubMessage::ProposerSlashing(_) => GossipKind::ProposerSlashing, @@ -97,7 +97,7 @@ impl PubsubMessage { agg_and_proof, ))); } - GossipKind::CommitteeIndex(subnet_id) => { + GossipKind::Attestation(subnet_id) => { let attestation = Attestation::from_ssz_bytes(decompressed_data) .map_err(|e| format!("{:?}", e))?; return Ok(PubsubMessage::Attestation(Box::new(( diff --git a/beacon_node/eth2-libp2p/src/types/topics.rs b/beacon_node/eth2-libp2p/src/types/topics.rs index ce536e2e15..c8021a5dd4 100644 --- a/beacon_node/eth2-libp2p/src/types/topics.rs +++ b/beacon_node/eth2-libp2p/src/types/topics.rs @@ -9,10 +9,7 @@ pub const TOPIC_PREFIX: &str = "eth2"; pub const SSZ_SNAPPY_ENCODING_POSTFIX: &str = "ssz_snappy"; pub const BEACON_BLOCK_TOPIC: &str = "beacon_block"; pub const BEACON_AGGREGATE_AND_PROOF_TOPIC: &str = "beacon_aggregate_and_proof"; -// for speed and easier string manipulation, committee topic index is split into a prefix and a -// postfix. The topic is committee_index{}_beacon_attestation where {} is an integer. -pub const COMMITEE_INDEX_TOPIC_PREFIX: &str = "committee_index"; -pub const COMMITEE_INDEX_TOPIC_POSTFIX: &str = "_beacon_attestation"; +pub const BEACON_ATTESTATION_PREFIX: &str = "beacon_attestation_"; pub const VOLUNTARY_EXIT_TOPIC: &str = "voluntary_exit"; pub const PROPOSER_SLASHING_TOPIC: &str = "proposer_slashing"; pub const ATTESTER_SLASHING_TOPIC: &str = "attester_slashing"; @@ -38,7 +35,7 @@ pub enum GossipKind { /// Topic for publishing aggregate attestations and proofs. BeaconAggregateAndProof, /// Topic for publishing raw attestations on a particular subnet. - CommitteeIndex(SubnetId), + Attestation(SubnetId), /// Topic for publishing voluntary exits. VoluntaryExit, /// Topic for publishing block proposer slashings. @@ -52,7 +49,7 @@ impl std::fmt::Display for GossipKind { match self { GossipKind::BeaconBlock => write!(f, "beacon_block"), GossipKind::BeaconAggregateAndProof => write!(f, "beacon_aggregate_and_proof"), - GossipKind::CommitteeIndex(subnet_id) => write!(f, "committee_index_{}", **subnet_id), + GossipKind::Attestation(subnet_id) => write!(f, "beacon_attestation_{}", **subnet_id), GossipKind::VoluntaryExit => write!(f, "voluntary_exit"), GossipKind::ProposerSlashing => write!(f, "proposer_slashing"), GossipKind::AttesterSlashing => write!(f, "attester_slashing"), @@ -124,7 +121,7 @@ impl GossipTopic { PROPOSER_SLASHING_TOPIC => GossipKind::ProposerSlashing, ATTESTER_SLASHING_TOPIC => GossipKind::AttesterSlashing, topic => match committee_topic_index(topic) { - Some(subnet_id) => GossipKind::CommitteeIndex(subnet_id), + Some(subnet_id) => GossipKind::Attestation(subnet_id), None => return Err(format!("Unknown topic: {}", topic)), }, }; @@ -158,10 +155,7 @@ impl Into for GossipTopic { GossipKind::VoluntaryExit => VOLUNTARY_EXIT_TOPIC.into(), GossipKind::ProposerSlashing => PROPOSER_SLASHING_TOPIC.into(), GossipKind::AttesterSlashing => ATTESTER_SLASHING_TOPIC.into(), - GossipKind::CommitteeIndex(index) => format!( - "{}{}{}", - COMMITEE_INDEX_TOPIC_PREFIX, *index, COMMITEE_INDEX_TOPIC_POSTFIX - ), + GossipKind::Attestation(index) => format!("{}{}", BEACON_ATTESTATION_PREFIX, *index,), }; format!( "/{}/{}/{}/{}", @@ -175,7 +169,7 @@ impl Into for GossipTopic { impl From for GossipKind { fn from(subnet_id: SubnetId) -> Self { - GossipKind::CommitteeIndex(subnet_id) + GossipKind::Attestation(subnet_id) } } @@ -183,17 +177,9 @@ impl From for GossipKind { // Determines if a string is a committee topic. fn committee_topic_index(topic: &str) -> Option { - if topic.starts_with(COMMITEE_INDEX_TOPIC_PREFIX) - && topic.ends_with(COMMITEE_INDEX_TOPIC_POSTFIX) - { + if topic.starts_with(BEACON_ATTESTATION_PREFIX) { return Some(SubnetId::new( - u64::from_str_radix( - topic - .trim_start_matches(COMMITEE_INDEX_TOPIC_PREFIX) - .trim_end_matches(COMMITEE_INDEX_TOPIC_POSTFIX), - 10, - ) - .ok()?, + u64::from_str_radix(topic.trim_start_matches(BEACON_ATTESTATION_PREFIX), 10).ok()?, )); } None diff --git a/beacon_node/network/src/attestation_service/mod.rs b/beacon_node/network/src/attestation_service/mod.rs index 2fb8facea6..9bdf795c8b 100644 --- a/beacon_node/network/src/attestation_service/mod.rs +++ b/beacon_node/network/src/attestation_service/mod.rs @@ -3,12 +3,12 @@ //! determines whether attestations should be aggregated and/or passed to the beacon node. use beacon_chain::{BeaconChain, BeaconChainTypes}; -use eth2_libp2p::{types::GossipKind, MessageId, NetworkGlobals, PeerId}; +use eth2_libp2p::{types::GossipKind, NetworkGlobals}; use futures::prelude::*; use hashset_delay::HashSetDelay; use rand::seq::SliceRandom; use rest_types::ValidatorSubscription; -use slog::{crit, debug, error, o, warn}; +use slog::{crit, debug, error, o, trace, warn}; use slot_clock::SlotClock; use std::collections::VecDeque; use std::pin::Pin; @@ -186,18 +186,34 @@ impl AttestationService { pub fn validator_subscriptions( &mut self, subscriptions: Vec, - ) -> Result<(), ()> { + ) -> Result<(), String> { for subscription in subscriptions { //NOTE: We assume all subscriptions have been verified before reaching this service // Registers the validator with the attestation service. // This will subscribe to long-lived random subnets if required. + trace!(self.log, + "Validator subscription"; + "subscription" => format!("{:?}", subscription), + ); self.add_known_validator(subscription.validator_index); - let subnet_id = SubnetId::new( - subscription.attestation_committee_index - % self.beacon_chain.spec.attestation_subnet_count, - ); + let subnet_id = match SubnetId::compute_subnet::( + subscription.slot, + subscription.attestation_committee_index, + subscription.committee_count_at_slot, + &self.beacon_chain.spec, + ) { + Ok(subnet_id) => subnet_id, + Err(e) => { + warn!(self.log, + "Failed to compute subnet id for validator subscription"; + "error" => format!("{:?}", e), + "validator_index" => subscription.validator_index + ); + continue; + } + }; let exact_subnet = ExactSubnet { subnet_id, @@ -219,9 +235,18 @@ impl AttestationService { if subscription.is_aggregator { // set the subscription timer to subscribe to the next subnet if required - if let Err(e) = self.subscribe_to_subnet(exact_subnet) { - warn!(self.log, "Subscription to subnet error"; "error" => e); - return Err(()); + if let Err(e) = self.subscribe_to_subnet(exact_subnet.clone()) { + warn!(self.log, + "Subscription to subnet error"; + "error" => e, + "validator_index" => subscription.validator_index, + ); + } else { + trace!(self.log, + "Subscribed to subnet for aggregator duties"; + "exact_subnet" => format!("{:?}", exact_subnet), + "validator_index" => subscription.validator_index + ); } } } @@ -232,25 +257,9 @@ impl AttestationService { /// verification, re-propagates and returns false. pub fn should_process_attestation( &mut self, - _message_id: &MessageId, - peer_id: &PeerId, subnet: &SubnetId, attestation: &Attestation, ) -> bool { - // verify the attestation is on the correct subnet - let expected_subnet = match attestation.subnet_id(&self.beacon_chain.spec) { - Ok(v) => v, - Err(e) => { - warn!(self.log, "Could not obtain attestation subnet_id"; "error" => format!("{:?}", e)); - return false; - } - }; - - if expected_subnet != *subnet { - warn!(self.log, "Received an attestation on the wrong subnet"; "subnet_received" => format!("{:?}", subnet), "subnet_expected" => format!("{:?}",expected_subnet), "peer_id" => format!("{}", peer_id)); - return false; - } - let exact_subnet = ExactSubnet { subnet_id: subnet.clone(), slot: attestation.data.slot, @@ -511,7 +520,7 @@ impl AttestationService { self.random_subnets.insert(subnet_id); // if we are not already subscribed, then subscribe - let topic_kind = &GossipKind::CommitteeIndex(subnet_id); + let topic_kind = &GossipKind::Attestation(subnet_id); let already_subscribed = self .network_globals @@ -574,7 +583,7 @@ impl AttestationService { // we are also not un-subscribing from a subnet if the next slot requires us to be // subscribed. Therefore there could be the case that we are already still subscribed // to the required subnet. In which case we do not issue another subscription request. - let topic_kind = &GossipKind::CommitteeIndex(exact_subnet.subnet_id); + let topic_kind = &GossipKind::Attestation(exact_subnet.subnet_id); if self .network_globals .gossipsub_subscriptions diff --git a/beacon_node/network/src/attestation_service/tests/mod.rs b/beacon_node/network/src/attestation_service/tests/mod.rs index 8447808a16..84d792c176 100644 --- a/beacon_node/network/src/attestation_service/tests/mod.rs +++ b/beacon_node/network/src/attestation_service/tests/mod.rs @@ -108,17 +108,23 @@ mod tests { validator_index: u64, attestation_committee_index: CommitteeIndex, slot: Slot, + committee_count_at_slot: u64, ) -> ValidatorSubscription { let is_aggregator = true; ValidatorSubscription { validator_index, attestation_committee_index, slot, + committee_count_at_slot, is_aggregator, } } - fn _get_subscriptions(validator_count: u64, slot: Slot) -> Vec { + fn _get_subscriptions( + validator_count: u64, + slot: Slot, + committee_count_at_slot: u64, + ) -> Vec { let mut subscriptions: Vec = Vec::new(); for validator_index in 0..validator_count { let is_aggregator = true; @@ -126,6 +132,7 @@ mod tests { validator_index, attestation_committee_index: validator_index, slot, + committee_count_at_slot, is_aggregator, }); } @@ -167,6 +174,7 @@ mod tests { let committee_index = 1; let subscription_slot = 0; let no_events_expected = 4; + let committee_count = 1; // create the attestation service and subscriptions let mut attestation_service = get_attestation_service(); @@ -180,6 +188,7 @@ mod tests { validator_index, committee_index, current_slot + Slot::new(subscription_slot), + committee_count, )]; // submit the subscriptions @@ -188,7 +197,15 @@ mod tests { .unwrap(); // not enough time for peer discovery, just subscribe - let expected = vec![AttServiceMessage::Subscribe(SubnetId::new(validator_index))]; + let expected = vec![AttServiceMessage::Subscribe( + SubnetId::compute_subnet::( + current_slot + Slot::new(subscription_slot), + committee_index, + committee_count, + &attestation_service.beacon_chain.spec, + ) + .unwrap(), + )]; let events = get_events(attestation_service, no_events_expected, 1).await; assert_matches!( @@ -215,6 +232,7 @@ mod tests { let committee_index = 1; let subscription_slot = 0; let no_events_expected = 5; + let committee_count = 1; // create the attestation service and subscriptions let mut attestation_service = get_attestation_service(); @@ -228,6 +246,7 @@ mod tests { validator_index, committee_index, current_slot + Slot::new(subscription_slot), + committee_count, )]; // submit the subscriptions @@ -236,9 +255,16 @@ mod tests { .unwrap(); // not enough time for peer discovery, just subscribe, unsubscribe + let subnet_id = SubnetId::compute_subnet::( + current_slot + Slot::new(subscription_slot), + committee_index, + committee_count, + &attestation_service.beacon_chain.spec, + ) + .unwrap(); let expected = vec![ - AttServiceMessage::Subscribe(SubnetId::new(validator_index)), - AttServiceMessage::Unsubscribe(SubnetId::new(validator_index)), + AttServiceMessage::Subscribe(subnet_id), + AttServiceMessage::Unsubscribe(subnet_id), ]; let events = get_events(attestation_service, no_events_expected, 2).await; @@ -266,6 +292,7 @@ mod tests { let committee_index = 1; let subscription_slot = 5; let no_events_expected = 4; + let committee_count = 1; // create the attestation service and subscriptions let mut attestation_service = get_attestation_service(); @@ -279,6 +306,7 @@ mod tests { validator_index, committee_index, current_slot + Slot::new(subscription_slot), + committee_count, )]; // submit the subscriptions @@ -295,10 +323,14 @@ mod tests { ); // just discover peers, don't subscribe yet - let expected = vec![AttServiceMessage::DiscoverPeers { - subnet_id: SubnetId::new(validator_index), - min_ttl, - }]; + let subnet_id = SubnetId::compute_subnet::( + current_slot + Slot::new(subscription_slot), + committee_index, + committee_count, + &attestation_service.beacon_chain.spec, + ) + .unwrap(); + let expected = vec![AttServiceMessage::DiscoverPeers { subnet_id, min_ttl }]; let events = get_events(attestation_service, no_events_expected, 1).await; assert_matches!( @@ -325,6 +357,7 @@ mod tests { let committee_index = 1; let subscription_slot = 5; let no_events_expected = 5; + let committee_count = 1; // create the attestation service and subscriptions let mut attestation_service = get_attestation_service(); @@ -338,6 +371,7 @@ mod tests { validator_index, committee_index, current_slot + Slot::new(subscription_slot), + committee_count, )]; // submit the subscriptions @@ -354,12 +388,16 @@ mod tests { ); // we should discover peers, wait, then subscribe + let subnet_id = SubnetId::compute_subnet::( + current_slot + Slot::new(subscription_slot), + committee_index, + committee_count, + &attestation_service.beacon_chain.spec, + ) + .unwrap(); let expected = vec![ - AttServiceMessage::DiscoverPeers { - subnet_id: SubnetId::new(validator_index), - min_ttl, - }, - AttServiceMessage::Subscribe(SubnetId::new(validator_index)), + AttServiceMessage::DiscoverPeers { subnet_id, min_ttl }, + AttServiceMessage::Subscribe(subnet_id), ]; let events = get_events(attestation_service, no_events_expected, 5).await; @@ -387,6 +425,7 @@ mod tests { let committee_index = 1; let subscription_slot = 7; let no_events_expected = 3; + let committee_count = 1; // create the attestation service and subscriptions let mut attestation_service = get_attestation_service(); @@ -400,6 +439,7 @@ mod tests { validator_index, committee_index, current_slot + Slot::new(subscription_slot), + committee_count, )]; // submit the subscriptions @@ -436,9 +476,11 @@ mod tests { let committee_index = 1; let subscription_slot = 10; let no_events_expected = 4; + let committee_count = 1; // create the attestation service and subscriptions let mut attestation_service = get_attestation_service(); + let current_slot = attestation_service .beacon_chain .slot_clock @@ -449,6 +491,7 @@ mod tests { validator_index, committee_index, current_slot + Slot::new(subscription_slot), + committee_count, )]; // submit the subscriptions @@ -464,11 +507,17 @@ mod tests { .unwrap(), ); + let subnet_id = SubnetId::compute_subnet::( + current_slot + Slot::new(subscription_slot), + committee_index, + committee_count, + &attestation_service.beacon_chain.spec, + ) + .unwrap(); + // expect discover peers because we will enter TARGET_PEER_DISCOVERY_SLOT_LOOK_AHEAD range - let expected: Vec = vec![AttServiceMessage::DiscoverPeers { - subnet_id: SubnetId::new(validator_index), - min_ttl, - }]; + let expected: Vec = + vec![AttServiceMessage::DiscoverPeers { subnet_id, min_ttl }]; let events = get_events(attestation_service, no_events_expected, 5).await; @@ -494,6 +543,7 @@ mod tests { // subscribe 10 slots ahead so we do not produce any exact subnet messages let subscription_slot = 10; let subscription_count = 64; + let committee_count = 1; // create the attestation service and subscriptions let mut attestation_service = get_attestation_service(); @@ -503,8 +553,11 @@ mod tests { .now() .expect("Could not get current slot"); - let subscriptions = - _get_subscriptions(subscription_count, current_slot + subscription_slot); + let subscriptions = _get_subscriptions( + subscription_count, + current_slot + subscription_slot, + committee_count, + ); // submit the subscriptions attestation_service @@ -542,6 +595,7 @@ mod tests { let subscription_slot = 10; // the 65th subscription should result in no more messages than the previous scenario let subscription_count = 65; + let committee_count = 1; // create the attestation service and subscriptions let mut attestation_service = get_attestation_service(); @@ -551,8 +605,11 @@ mod tests { .now() .expect("Could not get current slot"); - let subscriptions = - _get_subscriptions(subscription_count, current_slot + subscription_slot); + let subscriptions = _get_subscriptions( + subscription_count, + current_slot + subscription_slot, + committee_count, + ); // submit the subscriptions attestation_service diff --git a/beacon_node/network/src/router/mod.rs b/beacon_node/network/src/router/mod.rs index 025914d6bd..b44dcbe7e8 100644 --- a/beacon_node/network/src/router/mod.rs +++ b/beacon_node/network/src/router/mod.rs @@ -234,6 +234,7 @@ impl Router { self.processor.verify_unaggregated_attestation_for_gossip( peer_id.clone(), subnet_attestation.1.clone(), + subnet_attestation.0, ) { self.propagate_message(id, peer_id.clone()); diff --git a/beacon_node/network/src/router/processor.rs b/beacon_node/network/src/router/processor.rs index aa110d4279..5fd19675c4 100644 --- a/beacon_node/network/src/router/processor.rs +++ b/beacon_node/network/src/router/processor.rs @@ -17,7 +17,7 @@ use std::sync::Arc; use tokio::sync::mpsc; use types::{ Attestation, ChainSpec, Epoch, EthSpec, Hash256, SignedAggregateAndProof, SignedBeaconBlock, - Slot, + Slot, SubnetId, }; //TODO: Rate limit requests @@ -758,6 +758,18 @@ impl Processor { * The peer has published an invalid consensus message. */ } + + AttnError::InvalidSubnetId { received, expected } => { + /* + * The attestation was received on an incorrect subnet id. + */ + debug!( + self.log, + "Received attestation on incorrect subnet"; + "expected" => format!("{:?}", expected), + "received" => format!("{:?}", received), + ) + } AttnError::Invalid(_) => { /* * The attestation failed the state_processing verification. @@ -833,12 +845,13 @@ impl Processor { &mut self, peer_id: PeerId, unaggregated_attestation: Attestation, + subnet_id: SubnetId, ) -> Option> { // This is provided to the error handling function to assist with debugging. let beacon_block_root = unaggregated_attestation.data.beacon_block_root; self.chain - .verify_unaggregated_attestation_for_gossip(unaggregated_attestation) + .verify_unaggregated_attestation_for_gossip(unaggregated_attestation, subnet_id) .map_err(|e| { self.handle_attestation_verification_failure( peer_id, diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index edd480f792..3863ed2c37 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -14,7 +14,7 @@ use eth2_libp2p::{ use eth2_libp2p::{BehaviourEvent, Enr, MessageId, NetworkGlobals, PeerId}; use futures::prelude::*; use rest_types::ValidatorSubscription; -use slog::{debug, error, info, o, trace}; +use slog::{debug, error, info, o, trace, warn}; use std::sync::Arc; use std::time::Duration; use store::HotColdDB; @@ -236,10 +236,11 @@ fn spawn_service( ); } NetworkMessage::Subscribe { subscriptions } => { - // the result is dropped as it used solely for ergonomics - let _ = service + if let Err(e) = service .attestation_service - .validator_subscriptions(subscriptions); + .validator_subscriptions(subscriptions) { + warn!(service.log, "Validator subscription failed"; "error" => e); + } } } } @@ -327,8 +328,6 @@ fn spawn_service( // checks if we have an aggregator for the slot. If so, we process // the attestation if service.attestation_service.should_process_attestation( - &id, - &source, subnet, attestation, ) { diff --git a/beacon_node/rest_api/src/validator.rs b/beacon_node/rest_api/src/validator.rs index eaec575c00..7c1f1bdc04 100644 --- a/beacon_node/rest_api/src/validator.rs +++ b/beacon_node/rest_api/src/validator.rs @@ -16,7 +16,7 @@ use std::sync::Arc; use types::beacon_state::EthSpec; use types::{ Attestation, AttestationData, BeaconState, Epoch, RelativeEpoch, SelectionProof, - SignedAggregateAndProof, SignedBeaconBlock, Slot, + SignedAggregateAndProof, SignedBeaconBlock, Slot, SubnetId, }; /// HTTP Handler to retrieve the duties for a set of validators during a particular epoch. This @@ -220,6 +220,16 @@ fn return_validator_duties( )) })?; + let committee_count_at_slot = duties + .map(|d| state.get_committee_count_at_slot(d.slot)) + .transpose() + .map_err(|e| { + ApiError::ServerError(format!( + "Unable to find committee count at slot: {:?}", + e + )) + })?; + let aggregator_modulo = duties .map(|duties| SelectionProof::modulo(duties.committee_len, &beacon_chain.spec)) .transpose() @@ -238,6 +248,7 @@ fn return_validator_duties( validator_index: Some(validator_index as u64), attestation_slot: duties.map(|d| d.slot), attestation_committee_index: duties.map(|d| d.index), + committee_count_at_slot, attestation_committee_position: duties.map(|d| d.committee_position), block_proposal_slots, aggregator_modulo, @@ -249,6 +260,7 @@ fn return_validator_duties( attestation_slot: None, attestation_committee_index: None, attestation_committee_position: None, + committee_count_at_slot: None, block_proposal_slots: vec![], aggregator_modulo: None, }) @@ -443,21 +455,24 @@ pub async fn publish_attestations( )) }) // Process all of the aggregates _without_ exiting early if one fails. - .map(move |attestations: Vec>| { - attestations - .into_par_iter() - .enumerate() - .map(|(i, attestation)| { - process_unaggregated_attestation( - &beacon_chain, - network_chan.clone(), - attestation, - i, - &log, - ) - }) - .collect::>>() - }) + .map( + move |attestations: Vec<(Attestation, SubnetId)>| { + attestations + .into_par_iter() + .enumerate() + .map(|(i, (attestation, subnet_id))| { + process_unaggregated_attestation( + &beacon_chain, + network_chan.clone(), + attestation, + subnet_id, + i, + &log, + ) + }) + .collect::>>() + }, + ) // Iterate through all the results and return on the first `Err`. // // Note: this will only provide info about the _first_ failure, not all failures. @@ -471,6 +486,7 @@ fn process_unaggregated_attestation( beacon_chain: &BeaconChain, network_chan: NetworkChannel, attestation: Attestation, + subnet_id: SubnetId, i: usize, log: &Logger, ) -> Result<(), ApiError> { @@ -478,7 +494,7 @@ fn process_unaggregated_attestation( // Verify that the attestation is valid to included on the gossip network. let verified_attestation = beacon_chain - .verify_unaggregated_attestation_for_gossip(attestation.clone()) + .verify_unaggregated_attestation_for_gossip(attestation.clone(), subnet_id) .map_err(|e| { handle_attestation_error( e, @@ -491,9 +507,7 @@ fn process_unaggregated_attestation( // Publish the attestation to the network if let Err(e) = network_chan.send(NetworkMessage::Publish { messages: vec![PubsubMessage::Attestation(Box::new(( - attestation - .subnet_id(&beacon_chain.spec) - .map_err(|e| ApiError::ServerError(format!("Unable to get subnet id: {:?}", e)))?, + subnet_id, attestation, )))], }) { diff --git a/beacon_node/rest_api/tests/test.rs b/beacon_node/rest_api/tests/test.rs index 2cb239dbcd..39994c81d8 100644 --- a/beacon_node/rest_api/tests/test.rs +++ b/beacon_node/rest_api/tests/test.rs @@ -22,7 +22,7 @@ use types::{ }, BeaconBlock, BeaconState, ChainSpec, Domain, Epoch, EthSpec, MinimalEthSpec, PublicKey, RelativeEpoch, Signature, SignedAggregateAndProof, SignedBeaconBlock, SignedRoot, Slot, - Validator, + SubnetId, Validator, }; use version; @@ -144,7 +144,16 @@ fn validator_produce_attestation() { )) .expect("should fetch duties from http api"); let duties = &duties[0]; - + let committee_count = duties + .committee_count_at_slot + .expect("should have committee count"); + let subnet_id = SubnetId::compute_subnet::( + attestation.data.slot, + attestation.data.index, + committee_count, + spec, + ) + .unwrap(); // Try publishing the attestation without a signature or a committee bit set, ensure it is // raises an error. let publish_status = env @@ -153,7 +162,7 @@ fn validator_produce_attestation() { remote_node .http .validator() - .publish_attestations(vec![attestation.clone()]), + .publish_attestations(vec![(attestation.clone(), subnet_id)]), ) .expect("should publish unsigned attestation"); assert!( @@ -179,7 +188,7 @@ fn validator_produce_attestation() { remote_node .http .validator() - .publish_attestations(vec![attestation.clone()]), + .publish_attestations(vec![(attestation.clone(), subnet_id)]), ) .expect("should publish attestation with invalid signature"); assert!( @@ -217,7 +226,7 @@ fn validator_produce_attestation() { remote_node .http .validator() - .publish_attestations(vec![attestation.clone()]), + .publish_attestations(vec![(attestation.clone(), subnet_id)]), ) .expect("should publish attestation"); assert!( diff --git a/common/remote_beacon_node/src/lib.rs b/common/remote_beacon_node/src/lib.rs index c89f95ab61..a0b657ec12 100644 --- a/common/remote_beacon_node/src/lib.rs +++ b/common/remote_beacon_node/src/lib.rs @@ -12,7 +12,7 @@ use std::time::Duration; use types::{ Attestation, AttestationData, AttesterSlashing, BeaconBlock, BeaconState, CommitteeIndex, Epoch, EthSpec, Fork, Hash256, ProposerSlashing, PublicKey, PublicKeyBytes, Signature, - SignedAggregateAndProof, SignedBeaconBlock, Slot, + SignedAggregateAndProof, SignedBeaconBlock, Slot, SubnetId, }; use url::Url; @@ -227,7 +227,7 @@ impl Validator { /// Posts a list of attestations to the beacon node, expecting it to verify it and publish it to the network. pub async fn publish_attestations( &self, - attestation: Vec>, + attestation: Vec<(Attestation, SubnetId)>, ) -> Result { let client = self.0.clone(); let url = self.url("attestations")?; diff --git a/common/rest_types/src/validator.rs b/common/rest_types/src/validator.rs index a3bd34e7ee..2bbf019ffc 100644 --- a/common/rest_types/src/validator.rs +++ b/common/rest_types/src/validator.rs @@ -22,6 +22,8 @@ pub struct ValidatorDutyBase { pub attestation_committee_index: Option, /// The position of the validator in the committee. pub attestation_committee_position: Option, + /// The committee count at `attestation_slot`. + pub committee_count_at_slot: Option, /// The slots in which a validator must propose a block (can be empty). pub block_proposal_slots: Vec, /// This provides the modulo: `max(1, len(committee) // TARGET_AGGREGATORS_PER_COMMITTEE)` @@ -66,6 +68,8 @@ pub struct ValidatorSubscription { pub attestation_committee_index: CommitteeIndex, /// The slot in which to subscribe. pub slot: Slot, + /// Committee count at slot to subscribe. + pub committee_count_at_slot: u64, /// If true, the validator is an aggregator and the beacon node should aggregate attestations /// for this slot. pub is_aggregator: bool, diff --git a/consensus/fork_choice/tests/tests.rs b/consensus/fork_choice/tests/tests.rs index 3746f0ba14..0ef4d0edb1 100644 --- a/consensus/fork_choice/tests/tests.rs +++ b/consensus/fork_choice/tests/tests.rs @@ -12,7 +12,7 @@ use std::sync::Mutex; use store::{MemoryStore, StoreConfig}; use types::{ test_utils::{generate_deterministic_keypair, generate_deterministic_keypairs}, - Epoch, EthSpec, IndexedAttestation, MainnetEthSpec, Slot, + Epoch, EthSpec, IndexedAttestation, MainnetEthSpec, Slot, SubnetId, }; use types::{BeaconBlock, BeaconState, Hash256, SignedBeaconBlock}; @@ -285,6 +285,15 @@ impl ForkChoiceTest { .get(validator_committee_index) .expect("there should be an attesting validator"); + let committee_count = head + .beacon_state + .get_committee_count_at_slot(current_slot) + .expect("should not error while getting committee count"); + + let subnet_id = + SubnetId::compute_subnet::(current_slot, 0, committee_count, &chain.spec) + .expect("should compute subnet id"); + let validator_sk = generate_deterministic_keypair(validator_index).sk; attestation @@ -298,7 +307,7 @@ impl ForkChoiceTest { .expect("should sign attestation"); let mut verified_attestation = chain - .verify_unaggregated_attestation_for_gossip(attestation) + .verify_unaggregated_attestation_for_gossip(attestation, subnet_id) .expect("precondition: should gossip verify attestation"); if let MutationDelay::Blocks(slots) = delay { diff --git a/consensus/types/src/attestation.rs b/consensus/types/src/attestation.rs index 26052cbaa9..fd355959e4 100644 --- a/consensus/types/src/attestation.rs +++ b/consensus/types/src/attestation.rs @@ -1,10 +1,9 @@ use super::{ AggregateSignature, AttestationData, BitList, ChainSpec, Domain, EthSpec, Fork, SecretKey, - Signature, SignedRoot, SubnetId, + Signature, SignedRoot, }; use crate::{test_utils::TestRandom, Hash256}; -use safe_arith::{ArithError, SafeArith}; - +use safe_arith::ArithError; use serde_derive::{Deserialize, Serialize}; use ssz_derive::{Decode, Encode}; use test_random_derive::TestRandom; @@ -84,18 +83,6 @@ impl Attestation { Ok(()) } } - - /// Returns the subnet id associated with the attestation. - /// - /// Note, this will return the subnet id for an aggregated attestation. This is done - /// to avoid checking aggregate bits every time we wish to get an id. - pub fn subnet_id(&self, spec: &ChainSpec) -> Result { - self.data - .index - .safe_rem(spec.attestation_subnet_count) - .map(SubnetId::new) - .map_err(Error::SubnetCountIsZero) - } } #[cfg(test)] diff --git a/consensus/types/src/beacon_block.rs b/consensus/types/src/beacon_block.rs index 2eee8b8e7d..75c957b91c 100644 --- a/consensus/types/src/beacon_block.rs +++ b/consensus/types/src/beacon_block.rs @@ -83,12 +83,12 @@ impl BeaconBlock { }; let proposer_slashing = ProposerSlashing { signed_header_1: signed_header.clone(), - signed_header_2: signed_header.clone(), + signed_header_2: signed_header, }; let attester_slashing = AttesterSlashing { attestation_1: indexed_attestation.clone(), - attestation_2: indexed_attestation.clone(), + attestation_2: indexed_attestation, }; let attestation: Attestation = Attestation { diff --git a/consensus/types/src/subnet_id.rs b/consensus/types/src/subnet_id.rs index 11e8e13377..80cc249776 100644 --- a/consensus/types/src/subnet_id.rs +++ b/consensus/types/src/subnet_id.rs @@ -1,4 +1,6 @@ //! Identifies each shard by an integer identifier. +use crate::{AttestationData, ChainSpec, CommitteeIndex, EthSpec, Slot}; +use safe_arith::{ArithError, SafeArith}; use serde_derive::{Deserialize, Serialize}; use std::ops::{Deref, DerefMut}; @@ -8,7 +10,42 @@ pub struct SubnetId(u64); impl SubnetId { pub fn new(id: u64) -> Self { - SubnetId(id) + id.into() + } + + /// Compute the subnet for an attestation with `attestation_data` where each slot in the + /// attestation epoch contains `committee_count_per_slot` committees. + pub fn compute_subnet_for_attestation_data( + attestation_data: &AttestationData, + committee_count_per_slot: u64, + spec: &ChainSpec, + ) -> Result { + Self::compute_subnet::( + attestation_data.slot, + attestation_data.index, + committee_count_per_slot, + spec, + ) + } + + /// Compute the subnet for an attestation with `attestation.data.slot == slot` and + /// `attestation.data.index == committee_index` where each slot in the attestation epoch + /// contains `committee_count_at_slot` committees. + pub fn compute_subnet( + slot: Slot, + committee_index: CommitteeIndex, + committee_count_at_slot: u64, + spec: &ChainSpec, + ) -> Result { + let slots_since_epoch_start: u64 = slot.as_u64().safe_rem(T::slots_per_epoch())?; + + let committees_since_epoch_start = + committee_count_at_slot.safe_mul(slots_since_epoch_start)?; + + Ok(committees_since_epoch_start + .safe_add(committee_index)? + .safe_rem(spec.attestation_subnet_count)? + .into()) } } @@ -25,3 +62,15 @@ impl DerefMut for SubnetId { &mut self.0 } } + +impl From for SubnetId { + fn from(x: u64) -> Self { + Self(x) + } +} + +impl Into for SubnetId { + fn into(self) -> u64 { + self.0 + } +} diff --git a/validator_client/src/attestation_service.rs b/validator_client/src/attestation_service.rs index b2426192bc..8a3823588d 100644 --- a/validator_client/src/attestation_service.rs +++ b/validator_client/src/attestation_service.rs @@ -5,13 +5,13 @@ use crate::{ use environment::RuntimeContext; use futures::StreamExt; use remote_beacon_node::{PublishStatus, RemoteBeaconNode}; -use slog::{crit, debug, info, trace}; +use slog::{crit, debug, error, info, trace}; use slot_clock::SlotClock; use std::collections::HashMap; use std::ops::Deref; use std::sync::Arc; use tokio::time::{delay_until, interval_at, Duration, Instant}; -use types::{Attestation, ChainSpec, CommitteeIndex, EthSpec, Slot}; +use types::{Attestation, ChainSpec, CommitteeIndex, EthSpec, Slot, SubnetId}; /// Builds an `AttestationService`. pub struct AttestationServiceBuilder { @@ -334,17 +334,22 @@ impl AttestationService { .iter() .filter_map(|duty| { // Ensure that all required fields are present in the validator duty. - let (duty_slot, duty_committee_index, validator_committee_position, _) = - if let Some(tuple) = duty.attestation_duties() { - tuple - } else { - crit!( - log, - "Missing validator duties when signing"; - "duties" => format!("{:?}", duty) - ); - return None; - }; + let ( + duty_slot, + duty_committee_index, + validator_committee_position, + _, + committee_count_at_slot, + ) = if let Some(tuple) = duty.attestation_duties() { + tuple + } else { + crit!( + log, + "Missing validator duties when signing"; + "duties" => format!("{:?}", duty) + ); + return None; + }; // Ensure that the attestation matches the duties. if duty_slot != attestation.data.slot @@ -363,7 +368,18 @@ impl AttestationService { } let mut attestation = attestation.clone(); - + let subnet_id = SubnetId::compute_subnet_for_attestation_data::( + &attestation.data, + committee_count_at_slot, + &self.context.eth2_config().spec, + ) + .map_err(|e| { + error!( + log, + "Failed to compute subnet id to publish attestation: {:?}", e + ) + }) + .ok()?; self.validator_store .sign_attestation( duty.validator_pubkey(), @@ -371,7 +387,7 @@ impl AttestationService { &mut attestation, current_epoch, ) - .map(|_| attestation) + .map(|_| (attestation, subnet_id)) }) .collect::>(); @@ -379,7 +395,7 @@ impl AttestationService { // just return early. if let Some(attestation) = signed_attestations.first().cloned() { let num_attestations = signed_attestations.len(); - let beacon_block_root = attestation.data.beacon_block_root; + let beacon_block_root = attestation.0.data.beacon_block_root; self.beacon_node .http @@ -409,7 +425,7 @@ impl AttestationService { crit!(log, "Unknown condition when publishing unagg. attestation") } }) - .map(|()| Some(attestation)) + .map(|()| Some(attestation.0)) } else { debug!( log, @@ -459,7 +475,7 @@ impl AttestationService { // subscribed aggregators. let selection_proof = duty_and_proof.selection_proof.as_ref()?.clone(); - let (duty_slot, duty_committee_index, _, validator_index) = + let (duty_slot, duty_committee_index, _, validator_index, _) = duty_and_proof.attestation_duties().or_else(|| { crit!(log, "Missing duties when signing aggregate"); None diff --git a/validator_client/src/duties_service.rs b/validator_client/src/duties_service.rs index b7c0c0876c..d68ccc0089 100644 --- a/validator_client/src/duties_service.rs +++ b/validator_client/src/duties_service.rs @@ -90,12 +90,13 @@ impl DutyAndProof { /// Returns the information required for an attesting validator, if they are scheduled to /// attest. - pub fn attestation_duties(&self) -> Option<(Slot, CommitteeIndex, usize, u64)> { + pub fn attestation_duties(&self) -> Option<(Slot, CommitteeIndex, usize, u64, u64)> { Some(( self.duty.attestation_slot?, self.duty.attestation_committee_index?, self.duty.attestation_committee_position?, self.duty.validator_index?, + self.duty.committee_count_at_slot?, )) } @@ -116,6 +117,7 @@ impl TryInto for ValidatorDutyBytes { attestation_slot: self.attestation_slot, attestation_committee_index: self.attestation_committee_index, attestation_committee_position: self.attestation_committee_position, + committee_count_at_slot: self.committee_count_at_slot, block_proposal_slots: self.block_proposal_slots, aggregator_modulo: self.aggregator_modulo, }; @@ -609,6 +611,7 @@ impl DutiesService { validator_index: remote_duties.validator_index?, attestation_committee_index: remote_duties.attestation_committee_index?, slot: remote_duties.attestation_slot?, + committee_count_at_slot: remote_duties.committee_count_at_slot?, is_aggregator, }) } else {