From 6786b9d12a6da2f6d16e451c241e4670738bc009 Mon Sep 17 00:00:00 2001 From: Eitan Seri-Levi Date: Tue, 17 Jun 2025 12:01:26 +0300 Subject: [PATCH] Single attestation "Full" implementation (#7444) #6970 This allows for us to receive `SingleAttestation` over gossip and process it without converting. There is still a conversion to `Attestation` as a final step in the attestation verification process, but by then the `SingleAttestation` is fully verified. I've also fully removed the `submitPoolAttestationsV1` endpoint as its been deprecated I've also pre-emptively deprecated supporting `Attestation` in `submitPoolAttestationsV2` endpoint. See here for more info: https://github.com/ethereum/beacon-APIs/pull/531 I tried to the minimize the diff here by only making the "required" changes. There are some unnecessary complexities with the way we manage the different attestation verification wrapper types. We could probably consolidate this to one wrapper type and refactor this even further. We could leave that to a separate PR if we feel like cleaning things up in the future. Note that I've also updated the test harness to always submit `SingleAttestation` regardless of fork variant. I don't see a problem in that approach and it allows us to delete more code :) --- .../src/attestation_verification.rs | 263 +++++++++------- .../src/attestation_verification/batch.rs | 2 +- beacon_node/beacon_chain/src/beacon_chain.rs | 14 +- .../beacon_chain/src/single_attestation.rs | 52 ++-- beacon_node/beacon_chain/src/test_utils.rs | 55 +++- .../tests/attestation_verification.rs | 286 ++++++------------ beacon_node/beacon_chain/tests/store_tests.rs | 14 +- beacon_node/beacon_chain/tests/tests.rs | 4 +- beacon_node/beacon_processor/src/lib.rs | 29 +- beacon_node/http_api/src/lib.rs | 68 +---- .../http_api/src/publish_attestations.rs | 127 +------- beacon_node/http_api/tests/fork_tests.rs | 33 +- .../http_api/tests/interactive_tests.rs | 36 +-- beacon_node/http_api/tests/tests.rs | 241 ++++++++++++++- .../lighthouse_network/src/types/pubsub.rs | 53 +--- .../gossip_methods.rs | 209 ++----------- .../src/network_beacon_processor/mod.rs | 51 +--- .../src/network_beacon_processor/tests.rs | 12 +- beacon_node/network/src/router.rs | 11 - beacon_node/network/src/service.rs | 18 +- common/eth2/src/lib.rs | 49 +-- consensus/fork_choice/tests/tests.rs | 10 +- consensus/types/src/attestation.rs | 62 +++- .../src/attestation_service.rs | 59 ++-- 24 files changed, 777 insertions(+), 981 deletions(-) diff --git a/beacon_node/beacon_chain/src/attestation_verification.rs b/beacon_node/beacon_chain/src/attestation_verification.rs index d69667f3de..2f7c0be229 100644 --- a/beacon_node/beacon_chain/src/attestation_verification.rs +++ b/beacon_node/beacon_chain/src/attestation_verification.rs @@ -38,6 +38,7 @@ use crate::{ metrics, observed_aggregates::{ObserveOutcome, ObservedAttestationKey}, observed_attesters::Error as ObservedAttestersError, + single_attestation::single_attestation_to_attestation, BeaconChain, BeaconChainError, BeaconChainTypes, }; use bls::verify_signature_sets; @@ -202,12 +203,6 @@ pub enum Error { /// /// The peer has sent an invalid message. NoCommitteeForSlotAndIndex { slot: Slot, index: CommitteeIndex }, - /// The unaggregated attestation doesn't have only one aggregation bit set. - /// - /// ## Peer scoring - /// - /// The peer has sent an invalid message. - NotExactlyOneAggregationBitSet(usize), /// The attestation doesn't have only one aggregation bit set. /// /// ## Peer scoring @@ -304,9 +299,9 @@ struct IndexedAggregatedAttestation<'a, T: BeaconChainTypes> { /// /// These attestations have *not* undergone signature verification. struct IndexedUnaggregatedAttestation<'a, T: BeaconChainTypes> { - attestation: AttestationRef<'a, T::EthSpec>, + attestation: &'a SingleAttestation, indexed_attestation: IndexedAttestation, - subnet_id: SubnetId, + subnet_id: Option, validator_index: u64, } @@ -323,12 +318,13 @@ impl VerifiedAggregatedAttestation<'_, T> { } } +#[derive(Clone)] /// Wraps an `Attestation` that has been fully verified for propagation on the gossip network. pub struct VerifiedUnaggregatedAttestation<'a, T: BeaconChainTypes> { - attestation: AttestationRef<'a, T::EthSpec>, + attestation: Attestation, + single_attestation: &'a SingleAttestation, indexed_attestation: IndexedAttestation, subnet_id: SubnetId, - validator_index: usize, } impl VerifiedUnaggregatedAttestation<'_, T> { @@ -336,13 +332,8 @@ impl VerifiedUnaggregatedAttestation<'_, T> { self.indexed_attestation } - pub fn single_attestation(&self) -> Option { - Some(SingleAttestation { - committee_index: self.attestation.committee_index()?, - attester_index: self.validator_index as u64, - data: self.attestation.data().clone(), - signature: self.attestation.signature().clone(), - }) + pub fn single_attestation(&self) -> SingleAttestation { + self.single_attestation.clone() } } @@ -386,7 +377,7 @@ impl VerifiedAttestation for VerifiedAggregatedAttestati impl VerifiedAttestation for VerifiedUnaggregatedAttestation<'_, T> { fn attestation(&self) -> AttestationRef { - self.attestation + self.attestation.to_ref() } fn indexed_attestation(&self) -> &IndexedAttestation { @@ -400,6 +391,8 @@ pub enum AttestationSlashInfo<'a, T: BeaconChainTypes, TErr> { SignatureNotChecked(AttestationRef<'a, T::EthSpec>, TErr), /// As for `SignatureNotChecked`, but we know the `IndexedAttestation`. SignatureNotCheckedIndexed(IndexedAttestation, TErr), + /// As for `SignatureNotChecked`, but for the `SingleAttestation`. + SignatureNotCheckedSingle(&'a SingleAttestation, TErr), /// The attestation's signature is invalid, so it will never be slashable. SignatureInvalid(TErr), /// The signature is valid but the attestation is invalid in some other way. @@ -438,6 +431,20 @@ fn process_slash_info( } } } + SignatureNotCheckedSingle(attestation, err) => { + if let Error::UnknownHeadBlock { .. } = err { + if attestation.data.beacon_block_root == attestation.data.target.root { + return err; + } + } + + let fork_name = chain + .spec + .fork_name_at_slot::(attestation.data.slot); + + let indexed_attestation = attestation.to_indexed(fork_name); + (indexed_attestation, true, err) + } SignatureNotCheckedIndexed(indexed, err) => (indexed, true, err), SignatureInvalid(e) => return e, SignatureValid(indexed, err) => (indexed, false, err), @@ -461,6 +468,7 @@ fn process_slash_info( match slash_info { SignatureNotChecked(_, e) | SignatureNotCheckedIndexed(_, e) + | SignatureNotCheckedSingle(_, e) | SignatureInvalid(e) | SignatureValid(_, e) => e, } @@ -561,7 +569,7 @@ impl<'a, T: BeaconChainTypes> IndexedAggregatedAttestation<'a, T> { // // Attestations must be for a known block. If the block is unknown, we simply drop the // attestation and do not delay consideration for later. - let head_block = verify_head_block_is_known(chain, attestation, None)?; + let head_block = verify_head_block_is_known(chain, attestation.data(), None)?; // Check the attestation target root is consistent with the head root. // @@ -570,7 +578,7 @@ impl<'a, T: BeaconChainTypes> IndexedAggregatedAttestation<'a, T> { // // Whilst this attestation *technically* could be used to add value to a block, it is // invalid in the spirit of the protocol. Here we choose safety over profit. - verify_attestation_target_root::(&head_block, attestation)?; + verify_attestation_target_root::(&head_block, attestation.data())?; // Ensure that the attestation has participants. if attestation.is_aggregation_bits_zero() { @@ -813,16 +821,16 @@ impl<'a, T: BeaconChainTypes> VerifiedAggregatedAttestation<'a, T> { impl<'a, T: BeaconChainTypes> IndexedUnaggregatedAttestation<'a, T> { /// Run the checks that happen before an indexed attestation is constructed. pub fn verify_early_checks( - attestation: AttestationRef, + attestation: &'a SingleAttestation, chain: &BeaconChain, ) -> Result<(), Error> { - let attestation_epoch = attestation.data().slot.epoch(T::EthSpec::slots_per_epoch()); + let attestation_epoch = attestation.data.slot.epoch(T::EthSpec::slots_per_epoch()); // Check the attestation's epoch matches its target. - if attestation_epoch != attestation.data().target.epoch { + if attestation_epoch != attestation.data.target.epoch { return Err(Error::InvalidTargetEpoch { - slot: attestation.data().slot, - epoch: attestation.data().target.epoch, + slot: attestation.data.slot, + epoch: attestation.data.target.epoch, }); } @@ -832,61 +840,44 @@ impl<'a, T: BeaconChainTypes> IndexedUnaggregatedAttestation<'a, T> { // We do not queue future attestations for later processing. verify_propagation_slot_range::<_, T::EthSpec>( &chain.slot_clock, - attestation.data(), + &attestation.data, &chain.spec, )?; - // Check to ensure that the attestation is "unaggregated". I.e., it has exactly one - // aggregation bit set. - let num_aggregation_bits = attestation.num_set_aggregation_bits(); - if num_aggregation_bits != 1 { - return Err(Error::NotExactlyOneAggregationBitSet(num_aggregation_bits)); + let fork_name = chain + .spec + .fork_name_at_slot::(attestation.data.slot); + if fork_name.electra_enabled() { + // [New in Electra:EIP7549] + if attestation.data.index != 0 { + return Err(Error::CommitteeIndexNonZero( + attestation.data.index as usize, + )); + } } - // [New in Electra:EIP7549] - verify_committee_index(attestation)?; - // Attestations must be for a known block. If the block is unknown, we simply drop the // attestation and do not delay consideration for later. // // Enforce a maximum skip distance for unaggregated attestations. - let head_block = - verify_head_block_is_known(chain, attestation, chain.config.import_max_skip_slots)?; + let head_block = verify_head_block_is_known( + chain, + &attestation.data, + chain.config.import_max_skip_slots, + )?; // Check the attestation target root is consistent with the head root. - verify_attestation_target_root::(&head_block, attestation)?; + verify_attestation_target_root::(&head_block, &attestation.data)?; Ok(()) } /// Run the checks that apply to the indexed attestation before the signature is checked. pub fn verify_middle_checks( - attestation: AttestationRef, - indexed_attestation: &IndexedAttestation, - committees_per_slot: u64, - subnet_id: Option, + attestation: &'a SingleAttestation, chain: &BeaconChain, - ) -> Result<(u64, SubnetId), Error> { - let expected_subnet_id = SubnetId::compute_subnet_for_attestation::( - attestation, - committees_per_slot, - &chain.spec, - ) - .map_err(BeaconChainError::from)?; - - // If a subnet was specified, ensure that subnet is correct. - if let Some(subnet_id) = subnet_id { - if subnet_id != expected_subnet_id { - return Err(Error::InvalidSubnetId { - received: subnet_id, - expected: expected_subnet_id, - }); - } - }; - - let validator_index = *indexed_attestation - .attesting_indices_first() - .ok_or(Error::NotExactlyOneAggregationBitSet(0))?; + ) -> Result { + let validator_index = attestation.attester_index; /* * The attestation is the first valid attestation received for the participating validator @@ -895,16 +886,16 @@ impl<'a, T: BeaconChainTypes> IndexedUnaggregatedAttestation<'a, T> { if chain .observed_gossip_attesters .read() - .validator_has_been_observed(attestation.data().target.epoch, validator_index as usize) + .validator_has_been_observed(attestation.data.target.epoch, validator_index as usize) .map_err(BeaconChainError::from)? { return Err(Error::PriorAttestationKnown { validator_index, - epoch: attestation.data().target.epoch, + epoch: attestation.data.target.epoch, }); } - Ok((validator_index, expected_subnet_id)) + Ok(validator_index) } /// Returns `Ok(Self)` if the `attestation` is valid to be (re)published on the gossip @@ -913,11 +904,11 @@ impl<'a, T: BeaconChainTypes> IndexedUnaggregatedAttestation<'a, T> { /// `subnet_id` is the subnet from which we received this attestation. This function will /// verify that it was received on the correct subnet. pub fn verify( - attestation: &'a Attestation, + attestation: &'a SingleAttestation, subnet_id: Option, chain: &BeaconChain, ) -> Result { - Self::verify_slashable(attestation.to_ref(), subnet_id, chain) + Self::verify_slashable(attestation, subnet_id, chain) .inspect(|verified_unaggregated| { if let Some(slasher) = chain.slasher.as_ref() { slasher.accept_attestation(verified_unaggregated.indexed_attestation.clone()); @@ -928,31 +919,23 @@ impl<'a, T: BeaconChainTypes> IndexedUnaggregatedAttestation<'a, T> { /// Verify the attestation, producing extra information about whether it might be slashable. pub fn verify_slashable( - attestation: AttestationRef<'a, T::EthSpec>, + attestation: &'a SingleAttestation, subnet_id: Option, chain: &BeaconChain, ) -> Result> { use AttestationSlashInfo::*; if let Err(e) = Self::verify_early_checks(attestation, chain) { - return Err(SignatureNotChecked(attestation, e)); + return Err(SignatureNotCheckedSingle(attestation, e)); } - let (indexed_attestation, committees_per_slot) = - match obtain_indexed_attestation_and_committees_per_slot(chain, attestation) { - Ok(x) => x, - Err(e) => { - return Err(SignatureNotChecked(attestation, e)); - } - }; + let fork_name = chain + .spec + .fork_name_at_slot::(attestation.data.slot); - let (validator_index, expected_subnet_id) = match Self::verify_middle_checks( - attestation, - &indexed_attestation, - committees_per_slot, - subnet_id, - chain, - ) { + let indexed_attestation = attestation.to_indexed(fork_name); + + let validator_index = match Self::verify_middle_checks(attestation, chain) { Ok(t) => t, Err(e) => return Err(SignatureNotCheckedIndexed(indexed_attestation, e)), }; @@ -960,7 +943,7 @@ impl<'a, T: BeaconChainTypes> IndexedUnaggregatedAttestation<'a, T> { Ok(Self { attestation, indexed_attestation, - subnet_id: expected_subnet_id, + subnet_id, validator_index, }) } @@ -977,10 +960,55 @@ impl<'a, T: BeaconChainTypes> IndexedUnaggregatedAttestation<'a, T> { impl<'a, T: BeaconChainTypes> VerifiedUnaggregatedAttestation<'a, T> { /// Run the checks that apply after the signature has been checked. fn verify_late_checks( - attestation: AttestationRef, + attestation: &'a SingleAttestation, validator_index: u64, + subnet_id: Option, chain: &BeaconChain, - ) -> Result<(), Error> { + ) -> Result<(Attestation, SubnetId), Error> { + // Check that the attester is a member of the committee + let (committee_opt, committees_per_slot) = chain.with_committee_cache( + attestation.data.target.root, + attestation.data.slot.epoch(T::EthSpec::slots_per_epoch()), + |committee_cache, _| { + let committee_opt = committee_cache + .get_beacon_committee(attestation.data.slot, attestation.committee_index) + .map(|beacon_committee| beacon_committee.committee.to_vec()); + + Ok((committee_opt, committee_cache.committees_per_slot())) + }, + )?; + + let Some(committee) = committee_opt else { + return Err(Error::NoCommitteeForSlotAndIndex { + slot: attestation.data.slot, + index: attestation.committee_index, + }); + }; + + if !committee.contains(&(attestation.attester_index as usize)) { + return Err(Error::AttesterNotInCommittee { + attester_index: attestation.attester_index, + committee_index: attestation.committee_index, + slot: attestation.data.slot, + }); + } + + let expected_subnet_id = SubnetId::compute_subnet_for_single_attestation::( + attestation, + committees_per_slot, + &chain.spec, + ) + .map_err(BeaconChainError::from)?; + + // If a subnet was specified, ensure that subnet is correct. + if let Some(subnet_id) = subnet_id { + if subnet_id != expected_subnet_id { + return Err(Error::InvalidSubnetId { + received: subnet_id, + expected: expected_subnet_id, + }); + } + }; // Now that the attestation has been fully verified, store that we have received a valid // attestation from this validator. // @@ -990,20 +1018,28 @@ impl<'a, T: BeaconChainTypes> VerifiedUnaggregatedAttestation<'a, T> { if chain .observed_gossip_attesters .write() - .observe_validator(attestation.data().target.epoch, validator_index as usize) + .observe_validator(attestation.data.target.epoch, validator_index as usize) .map_err(BeaconChainError::from)? { return Err(Error::PriorAttestationKnown { validator_index, - epoch: attestation.data().target.epoch, + epoch: attestation.data.target.epoch, }); } - Ok(()) + + let fork_name = chain + .spec + .fork_name_at_slot::(attestation.data.slot); + + let unaggregated_attestation = + single_attestation_to_attestation(attestation, &committee, fork_name)?; + + Ok((unaggregated_attestation, expected_subnet_id)) } /// Verify the `unaggregated_attestation`. pub fn verify( - unaggregated_attestation: &'a Attestation, + unaggregated_attestation: &'a SingleAttestation, subnet_id: Option, chain: &BeaconChain, ) -> Result { @@ -1054,15 +1090,17 @@ impl<'a, T: BeaconChainTypes> VerifiedUnaggregatedAttestation<'a, T> { CheckAttestationSignature::No => (), }; - if let Err(e) = Self::verify_late_checks(attestation, validator_index, chain) { - return Err(SignatureValid(indexed_attestation, e)); - } + let (unaggregated_attestation, subnet_id) = + match Self::verify_late_checks(attestation, validator_index, subnet_id, chain) { + Ok(a) => a, + Err(e) => return Err(SignatureValid(indexed_attestation, e)), + }; Ok(Self { - attestation, + single_attestation: attestation, + attestation: unaggregated_attestation, indexed_attestation, subnet_id, - validator_index: validator_index as usize, }) } @@ -1071,11 +1109,6 @@ impl<'a, T: BeaconChainTypes> VerifiedUnaggregatedAttestation<'a, T> { self.subnet_id } - /// Returns the wrapped `attestation`. - pub fn attestation(&self) -> AttestationRef { - self.attestation - } - /// Returns the wrapped `indexed_attestation`. pub fn indexed_attestation(&self) -> &IndexedAttestation { &self.indexed_attestation @@ -1102,40 +1135,40 @@ impl<'a, T: BeaconChainTypes> VerifiedUnaggregatedAttestation<'a, T> { /// already finalized. fn verify_head_block_is_known( chain: &BeaconChain, - attestation: AttestationRef, + attestation_data: &AttestationData, max_skip_slots: Option, ) -> Result { let block_opt = chain .canonical_head .fork_choice_read_lock() - .get_block(&attestation.data().beacon_block_root) + .get_block(&attestation_data.beacon_block_root) .or_else(|| { chain .early_attester_cache - .get_proto_block(attestation.data().beacon_block_root) + .get_proto_block(attestation_data.beacon_block_root) }); if let Some(block) = block_opt { // Reject any block that exceeds our limit on skipped slots. if let Some(max_skip_slots) = max_skip_slots { - if attestation.data().slot > block.slot + max_skip_slots { + if attestation_data.slot > block.slot + max_skip_slots { return Err(Error::TooManySkippedSlots { head_block_slot: block.slot, - attestation_slot: attestation.data().slot, + attestation_slot: attestation_data.slot, }); } } - if !verify_attestation_is_finalized_checkpoint_or_descendant(attestation.data(), chain) { + if !verify_attestation_is_finalized_checkpoint_or_descendant(attestation_data, chain) { return Err(Error::HeadBlockFinalized { - beacon_block_root: attestation.data().beacon_block_root, + beacon_block_root: attestation_data.beacon_block_root, }); } Ok(block) - } else if chain.is_pre_finalization_block(attestation.data().beacon_block_root)? { + } else if chain.is_pre_finalization_block(attestation_data.beacon_block_root)? { Err(Error::HeadBlockFinalized { - beacon_block_root: attestation.data().beacon_block_root, + beacon_block_root: attestation_data.beacon_block_root, }) } else { // The block is either: @@ -1145,7 +1178,7 @@ fn verify_head_block_is_known( // 2) A post-finalization block that we don't know about yet. We'll queue // the attestation until the block becomes available (or we time out). Err(Error::UnknownHeadBlock { - beacon_block_root: attestation.data().beacon_block_root, + beacon_block_root: attestation_data.beacon_block_root, }) } } @@ -1237,11 +1270,11 @@ pub fn verify_attestation_signature( /// `attestation.data.beacon_block_root`. pub fn verify_attestation_target_root( head_block: &ProtoBlock, - attestation: AttestationRef, + attestation_data: &AttestationData, ) -> Result<(), Error> { // Check the attestation target root. let head_block_epoch = head_block.slot.epoch(E::slots_per_epoch()); - let attestation_epoch = attestation.data().slot.epoch(E::slots_per_epoch()); + let attestation_epoch = attestation_data.slot.epoch(E::slots_per_epoch()); if head_block_epoch > attestation_epoch { // The epoch references an invalid head block from a future epoch. // @@ -1254,7 +1287,7 @@ pub fn verify_attestation_target_root( // Reference: // https://github.com/ethereum/eth2.0-specs/pull/2001#issuecomment-699246659 return Err(Error::InvalidTargetRoot { - attestation: attestation.data().target.root, + attestation: attestation_data.target.root, // It is not clear what root we should expect in this case, since the attestation is // fundamentally invalid. expected: None, @@ -1273,9 +1306,9 @@ pub fn verify_attestation_target_root( }; // Reject any attestation with an invalid target root. - if target_root != attestation.data().target.root { + if target_root != attestation_data.target.root { return Err(Error::InvalidTargetRoot { - attestation: attestation.data().target.root, + attestation: attestation_data.target.root, expected: Some(target_root), }); } diff --git a/beacon_node/beacon_chain/src/attestation_verification/batch.rs b/beacon_node/beacon_chain/src/attestation_verification/batch.rs index 5f856140ba..266279432e 100644 --- a/beacon_node/beacon_chain/src/attestation_verification/batch.rs +++ b/beacon_node/beacon_chain/src/attestation_verification/batch.rs @@ -136,7 +136,7 @@ pub fn batch_verify_unaggregated_attestations<'a, T, I>( ) -> Result, Error>>, Error> where T: BeaconChainTypes, - I: Iterator, Option)> + ExactSizeIterator, + I: Iterator)> + ExactSizeIterator, { let mut num_partially_verified = 0; let mut num_failed = 0; diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index ef741f7b5b..0a4c5b4483 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -2074,7 +2074,7 @@ impl BeaconChain { AttestationError, > where - I: Iterator, Option)> + ExactSizeIterator, + I: Iterator)> + ExactSizeIterator, { batch_verify_unaggregated_attestations(attestations, self) } @@ -2086,7 +2086,7 @@ impl BeaconChain { /// aggregation bit set. pub fn verify_unaggregated_attestation_for_gossip<'a>( &self, - unaggregated_attestation: &'a Attestation, + unaggregated_attestation: &'a SingleAttestation, subnet_id: Option, ) -> Result, AttestationError> { metrics::inc_counter(&metrics::UNAGGREGATED_ATTESTATION_PROCESSING_REQUESTS); @@ -2102,13 +2102,9 @@ impl BeaconChain { .spec .fork_name_at_slot::(v.attestation().data().slot); if current_fork.electra_enabled() { - // I don't see a situation where this could return None. The upstream unaggregated attestation checks - // should have already verified that this is an attestation with a single committee bit set. - if let Some(single_attestation) = v.single_attestation() { - event_handler.register(EventKind::SingleAttestation(Box::new( - single_attestation, - ))); - } + event_handler.register(EventKind::SingleAttestation(Box::new( + v.single_attestation(), + ))); } } diff --git a/beacon_node/beacon_chain/src/single_attestation.rs b/beacon_node/beacon_chain/src/single_attestation.rs index fa4f98bb07..33a093687e 100644 --- a/beacon_node/beacon_chain/src/single_attestation.rs +++ b/beacon_node/beacon_chain/src/single_attestation.rs @@ -1,9 +1,13 @@ use crate::attestation_verification::Error; -use types::{Attestation, AttestationElectra, BitList, BitVector, EthSpec, SingleAttestation}; +use types::{ + Attestation, AttestationBase, AttestationElectra, BitList, BitVector, EthSpec, ForkName, + SingleAttestation, +}; pub fn single_attestation_to_attestation( single_attestation: &SingleAttestation, committee: &[usize], + fork_name: ForkName, ) -> Result, Error> { let attester_index = single_attestation.attester_index; let committee_index = single_attestation.committee_index; @@ -24,23 +28,33 @@ pub fn single_attestation_to_attestation( slot, })?; - let mut committee_bits: BitVector = BitVector::default(); - committee_bits - .set(committee_index as usize, true) - .map_err(|e| Error::Invalid(e.into()))?; + if fork_name.electra_enabled() { + let mut committee_bits: BitVector = BitVector::default(); + committee_bits + .set(committee_index as usize, true) + .map_err(|e| Error::Invalid(e.into()))?; - let mut aggregation_bits = - BitList::with_capacity(committee.len()).map_err(|e| Error::Invalid(e.into()))?; - aggregation_bits - .set(aggregation_bit, true) - .map_err(|e| Error::Invalid(e.into()))?; - - // TODO(electra): consider eventually allowing conversion to non-Electra attestations as well - // to maintain invertability (`Attestation` -> `SingleAttestation` -> `Attestation`). - Ok(Attestation::Electra(AttestationElectra { - aggregation_bits, - committee_bits, - data: single_attestation.data.clone(), - signature: single_attestation.signature.clone(), - })) + let mut aggregation_bits = + BitList::with_capacity(committee.len()).map_err(|e| Error::Invalid(e.into()))?; + aggregation_bits + .set(aggregation_bit, true) + .map_err(|e| Error::Invalid(e.into()))?; + Ok(Attestation::Electra(AttestationElectra { + aggregation_bits, + committee_bits, + data: single_attestation.data.clone(), + signature: single_attestation.signature.clone(), + })) + } else { + let mut aggregation_bits = + BitList::with_capacity(committee.len()).map_err(|e| Error::Invalid(e.into()))?; + aggregation_bits + .set(aggregation_bit, true) + .map_err(|e| Error::Invalid(e.into()))?; + Ok(Attestation::Base(AttestationBase { + aggregation_bits, + data: single_attestation.data.clone(), + signature: single_attestation.signature.clone(), + })) + } } diff --git a/beacon_node/beacon_chain/src/test_utils.rs b/beacon_node/beacon_chain/src/test_utils.rs index c2c5d8d626..99341f54af 100644 --- a/beacon_node/beacon_chain/src/test_utils.rs +++ b/beacon_node/beacon_chain/src/test_utils.rs @@ -1118,9 +1118,14 @@ where attn.aggregation_bits .set(aggregation_bit_index, true) .unwrap(); - attn + Attestation::Electra(attn) + } + Attestation::Base(mut attn) => { + attn.aggregation_bits + .set(aggregation_bit_index, true) + .unwrap(); + Attestation::Base(attn) } - Attestation::Base(_) => panic!("Must be an Electra attestation"), }; let aggregation_bits = attestation.get_aggregation_bits(); @@ -1148,8 +1153,10 @@ where let single_attestation = attestation.to_single_attestation_with_attester_index(attester_index as u64)?; + let fork_name = self.spec.fork_name_at_slot::(attestation.data().slot); let attestation: Attestation = - single_attestation_to_attestation(&single_attestation, committee.committee).unwrap(); + single_attestation_to_attestation(&single_attestation, committee.committee, fork_name) + .unwrap(); assert_eq!( single_attestation.committee_index, @@ -2407,7 +2414,11 @@ where }) } - pub fn process_attestations(&self, attestations: HarnessAttestations) { + pub fn process_attestations( + &self, + attestations: HarnessAttestations, + state: &BeaconState, + ) { let num_validators = self.validator_keypairs.len(); let mut unaggregated = Vec::with_capacity(num_validators); // This is an over-allocation, but it should be fine. It won't be *that* memory hungry and @@ -2416,7 +2427,35 @@ where for (unaggregated_attestations, maybe_signed_aggregate) in attestations.iter() { for (attn, subnet) in unaggregated_attestations { - unaggregated.push((attn, Some(*subnet))); + let aggregation_bits = attn.get_aggregation_bits(); + + if aggregation_bits.len() != 1 { + panic!("Must be an unaggregated attestation") + } + + let aggregation_bit = *aggregation_bits.first().unwrap(); + + let committee = state + .get_beacon_committee(attn.data().slot, attn.committee_index().unwrap()) + .unwrap(); + + let attester_index = committee + .committee + .iter() + .enumerate() + .find_map(|(i, &index)| { + if aggregation_bit as usize == i { + return Some(index); + } + None + }) + .unwrap(); + + let single_attestation = attn + .to_single_attestation_with_attester_index(attester_index as u64) + .unwrap(); + + unaggregated.push((single_attestation, Some(*subnet))); } if let Some(a) = maybe_signed_aggregate { @@ -2426,7 +2465,9 @@ where for result in self .chain - .batch_verify_unaggregated_attestations_for_gossip(unaggregated.into_iter()) + .batch_verify_unaggregated_attestations_for_gossip( + unaggregated.iter().map(|(attn, subnet)| (attn, *subnet)), + ) .unwrap() { let verified = result.unwrap(); @@ -2493,7 +2534,7 @@ where ) { let attestations = self.make_attestations(validators, state, state_root, block_hash, block.slot()); - self.process_attestations(attestations); + self.process_attestations(attestations, state); } pub fn sync_committee_sign_block( diff --git a/beacon_node/beacon_chain/tests/attestation_verification.rs b/beacon_node/beacon_chain/tests/attestation_verification.rs index 30eec539fc..11729f8d8a 100644 --- a/beacon_node/beacon_chain/tests/attestation_verification.rs +++ b/beacon_node/beacon_chain/tests/attestation_verification.rs @@ -8,24 +8,22 @@ use beacon_chain::test_utils::{MakeAttestationOptions, HARNESS_GENESIS_TIME}; use beacon_chain::{ attestation_verification::Error as AttnError, test_utils::{ - test_spec, AttestationStrategy, BeaconChainHarness, BlockStrategy, EphemeralHarnessType, + single_attestation_to_attestation, test_spec, AttestationStrategy, BeaconChainHarness, + BlockStrategy, EphemeralHarnessType, }, BeaconChain, BeaconChainError, BeaconChainTypes, ChainConfig, WhenSlotSkipped, }; use genesis::{interop_genesis_state, DEFAULT_ETH1_BLOCK_HASH}; use int_to_bytes::int_to_bytes32; -use ssz_types::BitVector; -use state_processing::{ - per_block_processing::errors::AttestationValidationError, per_slot_processing, -}; +use state_processing::per_slot_processing; use std::sync::{Arc, LazyLock}; use tree_hash::TreeHash; use types::{ signed_aggregate_and_proof::SignedAggregateAndProofRefMut, test_utils::generate_deterministic_keypair, Address, AggregateSignature, Attestation, - AttestationRef, AttestationRefMut, BeaconStateError, BitList, ChainSpec, Epoch, EthSpec, - FixedBytesExtended, ForkName, Hash256, Keypair, MainnetEthSpec, SecretKey, SelectionProof, - SignedAggregateAndProof, Slot, SubnetId, Unsigned, + AttestationRef, ChainSpec, Epoch, EthSpec, FixedBytesExtended, ForkName, Hash256, Keypair, + MainnetEthSpec, SecretKey, SelectionProof, SignedAggregateAndProof, SingleAttestation, Slot, + SubnetId, Unsigned, }; pub type E = MainnetEthSpec; @@ -122,7 +120,7 @@ fn get_harness_capella_spec( /// Also returns some info about who created it. fn get_valid_unaggregated_attestation( chain: &BeaconChain, -) -> (Attestation, usize, usize, SecretKey, SubnetId) { +) -> (SingleAttestation, SecretKey, SubnetId) { let head = chain.head_snapshot(); let current_slot = chain.slot().expect("should get slot"); @@ -156,8 +154,15 @@ fn get_valid_unaggregated_attestation( ) .expect("should sign attestation"); - let subnet_id = SubnetId::compute_subnet_for_attestation::( - valid_attestation.to_ref(), + let single_attestation = SingleAttestation { + committee_index: valid_attestation.committee_index().unwrap(), + attester_index: validator_index as u64, + data: valid_attestation.data().clone(), + signature: valid_attestation.signature().clone(), + }; + + let subnet_id = SubnetId::compute_subnet_for_single_attestation::( + &single_attestation, head.beacon_state .get_committee_count_at_slot(current_slot) .expect("should get committee count"), @@ -165,13 +170,7 @@ fn get_valid_unaggregated_attestation( ) .expect("should get subnet_id"); - ( - valid_attestation, - validator_index, - validator_committee_index, - validator_sk, - subnet_id, - ) + (single_attestation, validator_sk, subnet_id) } fn get_valid_aggregated_attestation( @@ -275,15 +274,13 @@ struct GossipTester { /* * Valid unaggregated attestation */ - valid_attestation: Attestation, - attester_validator_index: usize, - attester_committee_index: usize, + valid_attestation: SingleAttestation, attester_sk: SecretKey, attestation_subnet_id: SubnetId, /* * Valid unaggregated attestation for batch testing */ - invalid_attestation: Attestation, + invalid_attestation: SingleAttestation, /* * Valid aggregate */ @@ -312,22 +309,33 @@ impl GossipTester { // Advance into a slot where there have not been blocks or attestations produced. harness.advance_slot(); - let ( - valid_attestation, - attester_validator_index, - attester_committee_index, - attester_sk, - attestation_subnet_id, - ) = get_valid_unaggregated_attestation(&harness.chain); + let (valid_attestation, attester_sk, attestation_subnet_id) = + get_valid_unaggregated_attestation(&harness.chain); + + let head = harness.chain.head_snapshot(); + let state = &head.beacon_state; + let committee = state + .get_beacon_committee( + valid_attestation.data.slot, + valid_attestation.committee_index, + ) + .unwrap(); + let fork_name = harness + .chain + .spec + .fork_name_at_slot::(valid_attestation.data.slot); + let valid_aggregate_attestation = + single_attestation_to_attestation(&valid_attestation, committee.committee, fork_name) + .unwrap(); let (valid_aggregate, aggregator_validator_index, aggregator_sk) = - get_valid_aggregated_attestation(&harness.chain, valid_attestation.clone()); + get_valid_aggregated_attestation(&harness.chain, valid_aggregate_attestation.clone()); let mut invalid_attestation = valid_attestation.clone(); - invalid_attestation.data_mut().beacon_block_root = Hash256::repeat_byte(13); + invalid_attestation.data.beacon_block_root = Hash256::repeat_byte(13); let (mut invalid_aggregate, _, _) = - get_valid_aggregated_attestation(&harness.chain, invalid_attestation.clone()); + get_valid_aggregated_attestation(&harness.chain, valid_aggregate_attestation.clone()); match invalid_aggregate.to_mut() { SignedAggregateAndProofRefMut::Base(att) => { @@ -341,8 +349,6 @@ impl GossipTester { Self { harness, valid_attestation, - attester_validator_index, - attester_committee_index, attester_sk, attestation_subnet_id, invalid_attestation, @@ -467,12 +473,12 @@ impl GossipTester { pub fn inspect_unaggregate_err(self, desc: &str, get_attn: G, inspect_err: I) -> Self where - G: Fn(&Self, &mut Attestation, &mut SubnetId), + G: Fn(&Self, &mut SingleAttestation, &mut SubnetId, &ChainSpec), I: Fn(&Self, AttnError), { let mut attn = self.valid_attestation.clone(); let mut subnet_id = self.attestation_subnet_id; - get_attn(&self, &mut attn, &mut subnet_id); + get_attn(&self, &mut attn, &mut subnet_id, &self.harness.spec); /* * Individual verification @@ -912,32 +918,20 @@ async fn unaggregated_gossip_verification() { */ .inspect_unaggregate_err( "attestation with invalid committee index", - |tester, a, _| { - match a.to_mut() { - AttestationRefMut::Base(attn) => { - attn.data.index = tester - .harness - .chain - .head_snapshot() - .beacon_state - .get_committee_count_at_slot(attn.data.slot) - .unwrap(); - } - AttestationRefMut::Electra(attn) => { - let committee_index = tester - .harness - .chain - .head_snapshot() - .beacon_state - .get_committee_count_at_slot(attn.data.slot) - .unwrap(); - // overwrite the existing committee bits before setting - attn.committee_bits = BitVector::default(); - attn.committee_bits.set(committee_index as usize, true).unwrap(); - } - } + |tester, a, _, _| { + let committee_index = tester + .harness + .chain + .head_snapshot() + .beacon_state + .get_committee_count_at_slot(a.data.slot) + .unwrap(); + + a.committee_index = committee_index; + }, + |_, err| { + assert!(matches!(err, AttnError::NoCommitteeForSlotAndIndex { .. })) }, - |_, err| assert!(matches!(err, AttnError::NoCommitteeForSlotAndIndex { .. })), ) /* * The following test ensures: @@ -946,8 +940,8 @@ async fn unaggregated_gossip_verification() { * attestation.data.slot, attestation.data.index) == subnet_id). */ .inspect_unaggregate_err( - "attestation with invalid committee index", - |_, _, subnet_id| *subnet_id = SubnetId::new(42), + "attestation with invalid subnet_id", + |_, _, subnet_id, _| *subnet_id = SubnetId::new(42), |tester, err| { assert!(matches!( err, @@ -969,7 +963,7 @@ async fn unaggregated_gossip_verification() { */ .inspect_unaggregate_err( "attestation from future slot", - |tester, a, _| a.data_mut().slot = tester.slot() + 1, + |tester, a, _, _| a.data.slot = tester.slot() + 1, |tester, err| { assert!(matches!( err, @@ -983,10 +977,10 @@ async fn unaggregated_gossip_verification() { ) .inspect_unaggregate_err( "attestation from past slot", - |tester, a, _| { + |tester, a, _, _| { let too_early_slot = tester.earliest_valid_attestation_slot() - 1; - a.data_mut().slot = too_early_slot; - a.data_mut().target.epoch = too_early_slot.epoch(E::slots_per_epoch()); + a.data.slot = too_early_slot; + a.data.target.epoch = too_early_slot.epoch(E::slots_per_epoch()); }, |tester, err| { let valid_early_slot = tester.earliest_valid_attestation_slot(); @@ -1010,7 +1004,7 @@ async fn unaggregated_gossip_verification() { */ .inspect_unaggregate_err( "attestation with invalid target epoch", - |_, a, _| a.data_mut().target.epoch += 1, + |_, a, _, _| a.data.target.epoch += 1, |_, err| { assert!(matches!( err, @@ -1018,104 +1012,6 @@ async fn unaggregated_gossip_verification() { )) }, ) - /* - * The following two tests ensure: - * - * The attestation is unaggregated -- that is, it has exactly one participating validator - * (len([bit for bit in attestation.aggregation_bits if bit == 0b1]) == 1). - */ - .inspect_unaggregate_err( - "attestation without any aggregation bits set", - |tester, mut a, _| { - match &mut a { - Attestation::Base(ref mut att) => { - att.aggregation_bits - .set(tester.attester_committee_index, false) - .expect("should unset aggregation bit"); - assert_eq!( - att.aggregation_bits.num_set_bits(), - 0, - "test requires no set bits" - ); - } - Attestation::Electra(ref mut att) => { - att.aggregation_bits - .set(tester.attester_committee_index, false) - .expect("should unset aggregation bit"); - assert_eq!( - att.aggregation_bits.num_set_bits(), - 0, - "test requires no set bits" - ); - } - } - }, - |_, err| { - assert!(matches!( - err, - AttnError::NotExactlyOneAggregationBitSet(0) - )) - }, - ) - .inspect_unaggregate_err( - "attestation with two aggregation bits set", - |tester, mut a, _| { - match &mut a { - Attestation::Base(ref mut att) => { - att.aggregation_bits - .set(tester.attester_committee_index + 1, true) - .expect("should set second aggregation bit"); - } - Attestation::Electra(ref mut att) => { - att.aggregation_bits - .set(tester.attester_committee_index + 1, true) - .expect("should set second aggregation bit"); - } - } - }, - |_, err| { - assert!(matches!( - err, - AttnError::NotExactlyOneAggregationBitSet(2) - )) - }, - ) - /* - * The following test ensures: - * - * The number of aggregation bits matches the committee size -- i.e. - * `len(attestation.aggregation_bits) == len(get_beacon_committee(state, data.slot, - * data.index))`. - */ - .inspect_unaggregate_err( - "attestation with invalid bitfield", - |_, mut a, _| { - match &mut a { - Attestation::Base(ref mut att) => { - let bits = att.aggregation_bits.iter().collect::>(); - att.aggregation_bits = BitList::with_capacity(bits.len() + 1).unwrap(); - for (i, bit) in bits.into_iter().enumerate() { - att.aggregation_bits.set(i, bit).unwrap(); - } - } - Attestation::Electra(ref mut att) => { - let bits = att.aggregation_bits.iter().collect::>(); - att.aggregation_bits = BitList::with_capacity(bits.len() + 1).unwrap(); - for (i, bit) in bits.into_iter().enumerate() { - att.aggregation_bits.set(i, bit).unwrap(); - } - } - } - }, - |_, err| { - assert!(matches!( - err, - AttnError::Invalid(AttestationValidationError::BeaconStateError( - BeaconStateError::InvalidBitfield - )) - )) - }, - ) /* * The following test ensures that: * @@ -1123,8 +1019,8 @@ async fn unaggregated_gossip_verification() { */ .inspect_unaggregate_err( "attestation with unknown head block", - |_, a, _| { - a.data_mut().beacon_block_root = Hash256::repeat_byte(42); + |_, a, _, _| { + a.data.beacon_block_root = Hash256::repeat_byte(42); }, |_, err| { assert!(matches!( @@ -1145,8 +1041,8 @@ async fn unaggregated_gossip_verification() { */ .inspect_unaggregate_err( "attestation with invalid target root", - |_, a, _| { - a.data_mut().target.root = Hash256::repeat_byte(42); + |_, a, _, _| { + a.data.target.root = Hash256::repeat_byte(42); }, |_, err| { assert!(matches!( @@ -1162,10 +1058,10 @@ async fn unaggregated_gossip_verification() { */ .inspect_unaggregate_err( "attestation with bad signature", - |tester, a, _| { + |tester, a, _, _| { let mut agg_sig = AggregateSignature::infinity(); agg_sig.add_assign(&tester.attester_sk.sign(Hash256::repeat_byte(42))); - *a.signature_mut() = agg_sig; + a.signature = agg_sig; }, |_, err| { assert!(matches!( @@ -1186,7 +1082,7 @@ async fn unaggregated_gossip_verification() { */ .inspect_unaggregate_err( "attestation that has already been seen", - |_, _, _| {}, + |_, _, _, _| {}, |tester, err| { assert!(matches!( err, @@ -1194,7 +1090,7 @@ async fn unaggregated_gossip_verification() { validator_index, epoch, } - if validator_index == tester.attester_validator_index as u64 && epoch == tester.epoch() + if validator_index == tester.valid_attestation.attester_index && epoch == tester.epoch() )) }, ); @@ -1243,7 +1139,7 @@ async fn attestation_that_skips_epochs() { let state_root = state.update_tree_hash_cache().unwrap(); let (attestation, subnet_id) = harness - .get_unaggregated_attestations( + .get_single_attestations( &AttestationStrategy::AllValidators, &state, state_root, @@ -1256,7 +1152,7 @@ async fn attestation_that_skips_epochs() { .cloned() .expect("should have at least one attestation in committee"); - let block_root = attestation.data().beacon_block_root; + let block_root = attestation.data.beacon_block_root; let block_slot = harness .chain .store @@ -1267,7 +1163,7 @@ async fn attestation_that_skips_epochs() { .slot(); assert!( - attestation.data().slot - block_slot > E::slots_per_epoch() * 2, + attestation.data.slot - block_slot > E::slots_per_epoch() * 2, "the attestation must skip more than two epochs" ); @@ -1357,7 +1253,7 @@ async fn attestation_validator_receive_proposer_reward_and_withdrawals() { // Verifying the attestation triggers an inconsistent state replay. let remaining_attesters = (two_thirds..VALIDATOR_COUNT).collect(); let (attestation, subnet_id) = harness - .get_unaggregated_attestations( + .get_single_attestations( &AttestationStrategy::SomeValidators(remaining_attesters), &state, state_root, @@ -1426,7 +1322,7 @@ async fn attestation_to_finalized_block() { let state_root = state.update_tree_hash_cache().unwrap(); let (attestation, subnet_id) = harness - .get_unaggregated_attestations( + .get_single_attestations( &AttestationStrategy::AllValidators, &state, state_root, @@ -1438,7 +1334,7 @@ async fn attestation_to_finalized_block() { .first() .cloned() .expect("should have at least one attestation in committee"); - assert_eq!(attestation.data().beacon_block_root, earlier_block_root); + assert_eq!(attestation.data.beacon_block_root, earlier_block_root); // Attestation should be rejected for attesting to a pre-finalization block. let res = harness @@ -1481,8 +1377,23 @@ async fn verify_aggregate_for_gossip_doppelganger_detection() { "the test requires a new epoch to avoid already-seen errors" ); - let (valid_attestation, _attester_index, _attester_committee_index, _, _) = - get_valid_unaggregated_attestation(&harness.chain); + let (valid_attestation, _, _) = get_valid_unaggregated_attestation(&harness.chain); + + let head = harness.chain.head_snapshot(); + let state = &head.beacon_state; + let committee = state + .get_beacon_committee( + valid_attestation.data.slot, + valid_attestation.committee_index, + ) + .unwrap(); + let fork_name = harness + .chain + .spec + .fork_name_at_slot::(valid_attestation.data.slot); + let valid_attestation = + single_attestation_to_attestation(&valid_attestation, committee.committee, fork_name) + .unwrap(); let (valid_aggregate, _, _) = get_valid_aggregated_attestation(&harness.chain, valid_attestation); @@ -1540,15 +1451,16 @@ async fn verify_attestation_for_gossip_doppelganger_detection() { "the test requires a new epoch to avoid already-seen errors" ); - let (valid_attestation, index, _attester_committee_index, _, subnet_id) = - get_valid_unaggregated_attestation(&harness.chain); + let (valid_attestation, _, subnet_id) = get_valid_unaggregated_attestation(&harness.chain); + + let index = valid_attestation.attester_index as usize; harness .chain .verify_unaggregated_attestation_for_gossip(&valid_attestation, Some(subnet_id)) .expect("should verify attestation"); - let epoch = valid_attestation.data().target.epoch; + let epoch = valid_attestation.data.target.epoch; assert!(harness.chain.validator_seen_at_epoch(index, epoch)); // Check the correct beacon cache is populated @@ -1612,7 +1524,7 @@ async fn attestation_verification_use_head_state_fork() { let attesters = (0..VALIDATOR_COUNT / 2).collect::>(); let capella_fork = spec.fork_for_name(ForkName::Capella).unwrap(); let committee_attestations = harness - .make_unaggregated_attestations_with_opts( + .make_single_attestations_with_opts( attesters.as_slice(), &state, state_root, @@ -1642,7 +1554,7 @@ async fn attestation_verification_use_head_state_fork() { let attesters = (VALIDATOR_COUNT / 2..VALIDATOR_COUNT).collect::>(); let bellatrix_fork = spec.fork_for_name(ForkName::Bellatrix).unwrap(); let committee_attestations = harness - .make_unaggregated_attestations_with_opts( + .make_single_attestations_with_opts( attesters.as_slice(), &state, state_root, diff --git a/beacon_node/beacon_chain/tests/store_tests.rs b/beacon_node/beacon_chain/tests/store_tests.rs index d0f161ed56..e399339545 100644 --- a/beacon_node/beacon_chain/tests/store_tests.rs +++ b/beacon_node/beacon_chain/tests/store_tests.rs @@ -491,7 +491,7 @@ async fn epoch_boundary_state_attestation_processing() { .await; let head = harness.chain.head_snapshot(); - late_attestations.extend(harness.get_unaggregated_attestations( + late_attestations.extend(harness.get_single_attestations( &AttestationStrategy::SomeValidators(late_validators.clone()), &head.beacon_state, head.beacon_state_root(), @@ -511,7 +511,7 @@ async fn epoch_boundary_state_attestation_processing() { for (attestation, subnet_id) in late_attestations.into_iter().flatten() { // load_epoch_boundary_state is idempotent! - let block_root = attestation.data().beacon_block_root; + let block_root = attestation.data.beacon_block_root; let block = store .get_blinded_block(&block_root) .unwrap() @@ -536,7 +536,7 @@ async fn epoch_boundary_state_attestation_processing() { .verify_unaggregated_attestation_for_gossip(&attestation, Some(subnet_id)); let current_slot = harness.chain.slot().expect("should get slot"); - let expected_attestation_slot = attestation.data().slot; + let expected_attestation_slot = attestation.data.slot; // Extra -1 to handle gossip clock disparity. let expected_earliest_permissible_slot = current_slot - E::slots_per_epoch() - 1; @@ -2704,7 +2704,7 @@ async fn process_blocks_and_attestations_for_unaligned_checkpoint() { slot, ); harness.advance_slot(); - harness.process_attestations(attestations); + harness.process_attestations(attestations, &advanced_split_state); } } @@ -2866,8 +2866,8 @@ async fn revert_minority_fork_on_resume() { ); harness1.set_current_slot(slot); harness2.set_current_slot(slot); - harness1.process_attestations(attestations.clone()); - harness2.process_attestations(attestations); + harness1.process_attestations(attestations.clone(), &state); + harness2.process_attestations(attestations, &state); let ((block, blobs), new_state) = harness1.make_block(state, slot).await; @@ -2907,7 +2907,7 @@ async fn revert_minority_fork_on_resume() { slot, ); harness2.set_current_slot(slot); - harness2.process_attestations(attestations); + harness2.process_attestations(attestations, &state2); // Minority chain block (no attesters). let ((block1, blobs1), new_state1) = harness1.make_block(state1, slot).await; diff --git a/beacon_node/beacon_chain/tests/tests.rs b/beacon_node/beacon_chain/tests/tests.rs index be34248d95..55ef3dc279 100644 --- a/beacon_node/beacon_chain/tests/tests.rs +++ b/beacon_node/beacon_chain/tests/tests.rs @@ -580,7 +580,7 @@ async fn attestations_with_increasing_slots() { let head = harness.chain.head_snapshot(); let head_state_root = head.beacon_state_root(); - attestations.extend(harness.get_unaggregated_attestations( + attestations.extend(harness.get_single_attestations( &AttestationStrategy::AllValidators, &head.beacon_state, head_state_root, @@ -597,7 +597,7 @@ async fn attestations_with_increasing_slots() { .verify_unaggregated_attestation_for_gossip(&attestation, Some(subnet_id)); let current_slot = harness.chain.slot().expect("should get slot"); - let expected_attestation_slot = attestation.data().slot; + let expected_attestation_slot = attestation.data.slot; let expected_earliest_permissible_slot = current_slot - MinimalEthSpec::slots_per_epoch() - 1; diff --git a/beacon_node/beacon_processor/src/lib.rs b/beacon_node/beacon_processor/src/lib.rs index 3acc11b1d2..659cb808d5 100644 --- a/beacon_node/beacon_processor/src/lib.rs +++ b/beacon_node/beacon_processor/src/lib.rs @@ -63,7 +63,7 @@ use tokio::sync::mpsc; use tokio::sync::mpsc::error::TrySendError; use tracing::{debug, error, trace, warn}; use types::{ - Attestation, BeaconState, ChainSpec, EthSpec, Hash256, RelativeEpoch, SignedAggregateAndProof, + BeaconState, ChainSpec, EthSpec, Hash256, RelativeEpoch, SignedAggregateAndProof, SingleAttestation, Slot, SubnetId, }; use work_reprocessing_queue::{ @@ -557,32 +557,23 @@ pub enum BlockingOrAsync { Blocking(BlockingFn), Async(AsyncFn), } -pub type GossipAttestationBatch = Vec>>; +pub type GossipAttestationBatch = Vec>; /// Indicates the type of work to be performed and therefore its priority and /// queuing specifics. pub enum Work { GossipAttestation { - attestation: Box>>, - process_individual: Box>) + Send + Sync>, - process_batch: Box) + Send + Sync>, - }, - // Attestation requiring conversion before processing. - // - // For now this is a `SingleAttestation`, but eventually we will switch this around so that - // legacy `Attestation`s are converted and the main processing pipeline operates on - // `SingleAttestation`s. - GossipAttestationToConvert { attestation: Box>, process_individual: Box) + Send + Sync>, + process_batch: Box, }, UnknownBlockAttestation { process_fn: BlockingFn, }, GossipAttestationBatch { - attestations: GossipAttestationBatch, - process_batch: Box) + Send + Sync>, + attestations: GossipAttestationBatch, + process_batch: Box, }, GossipAggregate { aggregate: Box>, @@ -712,7 +703,6 @@ impl Work { fn to_type(&self) -> WorkType { match self { Work::GossipAttestation { .. } => WorkType::GossipAttestation, - Work::GossipAttestationToConvert { .. } => WorkType::GossipAttestationToConvert, Work::GossipAttestationBatch { .. } => WorkType::GossipAttestationBatch, Work::GossipAggregate { .. } => WorkType::GossipAggregate, Work::GossipAggregateBatch { .. } => WorkType::GossipAggregateBatch, @@ -1330,9 +1320,6 @@ impl BeaconProcessor { match work { _ if can_spawn => self.spawn_worker(work, idle_tx), Work::GossipAttestation { .. } => attestation_queue.push(work), - Work::GossipAttestationToConvert { .. } => { - attestation_to_convert_queue.push(work) - } // Attestation batches are formed internally within the // `BeaconProcessor`, they are not sent from external services. Work::GossipAttestationBatch { .. } => crit!( @@ -1578,12 +1565,6 @@ impl BeaconProcessor { } => task_spawner.spawn_blocking(move || { process_individual(*attestation); }), - Work::GossipAttestationToConvert { - attestation, - process_individual, - } => task_spawner.spawn_blocking(move || { - process_individual(*attestation); - }), Work::GossipAttestationBatch { attestations, process_batch, diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index e53fecbdb7..9453f1725a 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -45,7 +45,6 @@ pub use block_id::BlockId; use builder_states::get_next_withdrawals; use bytes::Bytes; use directory::DEFAULT_ROOT_DIR; -use either::Either; use eth2::types::{ self as api_types, BroadcastValidation, ContextDeserialize, EndpointVersion, ForkChoice, ForkChoiceNode, LightClientUpdatesQuery, PublishBlockRequest, StateId as CoreStateId, @@ -64,7 +63,6 @@ pub use publish_blocks::{ publish_blinded_block, publish_block, reconstruct_block, ProvenancedBlock, }; use serde::{Deserialize, Serialize}; -use serde_json::Value; use slot_clock::SlotClock; use ssz::Encode; pub use state_id::StateId; @@ -87,13 +85,13 @@ use tokio_stream::{ StreamExt, }; use tracing::{debug, error, info, warn}; -use types::AttestationData; use types::{ - Attestation, AttestationShufflingId, AttesterSlashing, BeaconStateError, ChainSpec, Checkpoint, - CommitteeCache, ConfigAndPreset, Epoch, EthSpec, ForkName, Hash256, ProposerPreparationData, - ProposerSlashing, RelativeEpoch, SignedAggregateAndProof, SignedBlindedBeaconBlock, - SignedBlsToExecutionChange, SignedContributionAndProof, SignedValidatorRegistrationData, - SignedVoluntaryExit, Slot, SyncCommitteeMessage, SyncContributionData, + Attestation, AttestationData, AttestationShufflingId, AttesterSlashing, BeaconStateError, + ChainSpec, Checkpoint, CommitteeCache, ConfigAndPreset, Epoch, EthSpec, ForkName, Hash256, + ProposerPreparationData, ProposerSlashing, RelativeEpoch, SignedAggregateAndProof, + SignedBlindedBeaconBlock, SignedBlsToExecutionChange, SignedContributionAndProof, + SignedValidatorRegistrationData, SignedVoluntaryExit, SingleAttestation, Slot, + SyncCommitteeMessage, SyncContributionData, }; use validator::pubkey_to_validator_index; use version::{ @@ -1981,68 +1979,21 @@ pub fn serve( .and(task_spawner_filter.clone()) .and(chain_filter.clone()); - let post_beacon_pool_attestations_v1 = beacon_pool_path - .clone() - .and(warp::path("attestations")) - .and(warp::path::end()) - .and(warp_utils::json::json()) - .and(network_tx_filter.clone()) - .and(reprocess_send_filter.clone()) - .then( - |task_spawner: TaskSpawner, - chain: Arc>, - attestations: Vec>, - network_tx: UnboundedSender>, - reprocess_tx: Option>| async move { - let attestations = attestations.into_iter().map(Either::Left).collect(); - let result = crate::publish_attestations::publish_attestations( - task_spawner, - chain, - attestations, - network_tx, - reprocess_tx, - ) - .await - .map(|()| warp::reply::json(&())); - convert_rejection(result).await - }, - ); - let post_beacon_pool_attestations_v2 = beacon_pool_path_v2 .clone() .and(warp::path("attestations")) .and(warp::path::end()) - .and(warp_utils::json::json::()) + .and(warp_utils::json::json::>()) .and(optional_consensus_version_header_filter) .and(network_tx_filter.clone()) .and(reprocess_send_filter.clone()) .then( |task_spawner: TaskSpawner, chain: Arc>, - payload: Value, - fork_name: Option, + attestations: Vec, + _fork_name: Option, network_tx: UnboundedSender>, reprocess_tx: Option>| async move { - let attestations = - match crate::publish_attestations::deserialize_attestation_payload::( - payload, fork_name, - ) { - Ok(attestations) => attestations, - Err(err) => { - warn!( - error = ?err, - "Unable to deserialize attestation POST request" - ); - return warp::reply::with_status( - warp::reply::json( - &"Unable to deserialize request body".to_string(), - ), - eth2::StatusCode::BAD_REQUEST, - ) - .into_response(); - } - }; - let result = crate::publish_attestations::publish_attestations( task_spawner, chain, @@ -5058,7 +5009,6 @@ pub fn serve( .uor(post_beacon_blinded_blocks) .uor(post_beacon_blocks_v2) .uor(post_beacon_blinded_blocks_v2) - .uor(post_beacon_pool_attestations_v1) .uor(post_beacon_pool_attestations_v2) .uor(post_beacon_pool_attester_slashings) .uor(post_beacon_pool_proposer_slashings) diff --git a/beacon_node/http_api/src/publish_attestations.rs b/beacon_node/http_api/src/publish_attestations.rs index db85b8f205..3c18a8ec41 100644 --- a/beacon_node/http_api/src/publish_attestations.rs +++ b/beacon_node/http_api/src/publish_attestations.rs @@ -36,16 +36,13 @@ //! attestations and there's no immediate cause for concern. use crate::task_spawner::{Priority, TaskSpawner}; use beacon_chain::{ - single_attestation::single_attestation_to_attestation, validator_monitor::timestamp_now, - AttestationError, BeaconChain, BeaconChainError, BeaconChainTypes, + validator_monitor::timestamp_now, AttestationError, BeaconChain, BeaconChainError, + BeaconChainTypes, }; use beacon_processor::work_reprocessing_queue::{QueuedUnaggregate, ReprocessQueueMessage}; -use either::Either; use eth2::types::Failure; use lighthouse_network::PubsubMessage; use network::NetworkMessage; -use serde_json::Value; -use std::borrow::Cow; use std::sync::Arc; use std::time::Duration; use tokio::sync::{ @@ -53,7 +50,7 @@ use tokio::sync::{ oneshot, }; use tracing::{debug, error, warn}; -use types::{Attestation, EthSpec, ForkName, SingleAttestation}; +use types::SingleAttestation; // Error variants are only used in `Debug` and considered `dead_code` by the compiler. #[derive(Debug)] @@ -65,8 +62,6 @@ pub enum Error { ReprocessDisabled, ReprocessFull, ReprocessTimeout, - InvalidJson(#[allow(dead_code)] serde_json::Error), - FailedConversion(#[allow(dead_code)] Box), } enum PublishAttestationResult { @@ -76,66 +71,24 @@ enum PublishAttestationResult { Failure(Error), } -#[allow(clippy::type_complexity)] -pub fn deserialize_attestation_payload( - payload: Value, - fork_name: Option, -) -> Result, SingleAttestation>>, Error> { - if fork_name.is_some_and(|fork_name| fork_name.electra_enabled()) || fork_name.is_none() { - if fork_name.is_none() { - warn!("No Consensus Version header specified."); - } - - Ok(serde_json::from_value::>(payload) - .map_err(Error::InvalidJson)? - .into_iter() - .map(Either::Right) - .collect()) - } else { - Ok( - serde_json::from_value::>>(payload) - .map_err(Error::InvalidJson)? - .into_iter() - .map(Either::Left) - .collect(), - ) - } -} - fn verify_and_publish_attestation( chain: &Arc>, - either_attestation: &Either, SingleAttestation>, + attestation: &SingleAttestation, seen_timestamp: Duration, network_tx: &UnboundedSender>, ) -> Result<(), Error> { - let attestation = convert_to_attestation(chain, either_attestation)?; let verified_attestation = chain - .verify_unaggregated_attestation_for_gossip(&attestation, None) + .verify_unaggregated_attestation_for_gossip(attestation, None) .map_err(Error::Validation)?; - match either_attestation { - Either::Left(attestation) => { - // Publish. - network_tx - .send(NetworkMessage::Publish { - messages: vec![PubsubMessage::Attestation(Box::new(( - verified_attestation.subnet_id(), - attestation.clone(), - )))], - }) - .map_err(|_| Error::Publication)?; - } - Either::Right(single_attestation) => { - network_tx - .send(NetworkMessage::Publish { - messages: vec![PubsubMessage::SingleAttestation(Box::new(( - verified_attestation.subnet_id(), - single_attestation.clone(), - )))], - }) - .map_err(|_| Error::Publication)?; - } - } + network_tx + .send(NetworkMessage::Publish { + messages: vec![PubsubMessage::Attestation(Box::new(( + verified_attestation.subnet_id(), + attestation.clone(), + )))], + }) + .map_err(|_| Error::Publication)?; // Notify the validator monitor. chain @@ -172,57 +125,10 @@ fn verify_and_publish_attestation( } } -fn convert_to_attestation<'a, T: BeaconChainTypes>( - chain: &Arc>, - attestation: &'a Either, SingleAttestation>, -) -> Result>, Error> { - match attestation { - Either::Left(a) => Ok(Cow::Borrowed(a)), - Either::Right(single_attestation) => { - let conversion_result = chain.with_committee_cache( - single_attestation.data.target.root, - single_attestation - .data - .slot - .epoch(T::EthSpec::slots_per_epoch()), - |committee_cache, _| { - let Some(committee) = committee_cache.get_beacon_committee( - single_attestation.data.slot, - single_attestation.committee_index, - ) else { - return Ok(Err(AttestationError::NoCommitteeForSlotAndIndex { - slot: single_attestation.data.slot, - index: single_attestation.committee_index, - })); - }; - - Ok(single_attestation_to_attestation::( - single_attestation, - committee.committee, - ) - .map(Cow::Owned)) - }, - ); - match conversion_result { - Ok(Ok(attestation)) => Ok(attestation), - Ok(Err(e)) => Err(Error::Validation(e)), - // Map the error returned by `with_committee_cache` for unknown blocks into the - // `UnknownHeadBlock` error that is gracefully handled. - Err(BeaconChainError::MissingBeaconBlock(beacon_block_root)) => { - Err(Error::Validation(AttestationError::UnknownHeadBlock { - beacon_block_root, - })) - } - Err(e) => Err(Error::FailedConversion(Box::new(e))), - } - } - } -} - pub async fn publish_attestations( task_spawner: TaskSpawner, chain: Arc>, - attestations: Vec, SingleAttestation>>, + attestations: Vec, network_tx: UnboundedSender>, reprocess_send: Option>, ) -> Result<(), warp::Rejection> { @@ -230,10 +136,7 @@ pub async fn publish_attestations( // move the `attestations` vec into the blocking task, so this small overhead is unavoidable. let attestation_metadata = attestations .iter() - .map(|att| match att { - Either::Left(att) => (att.data().slot, att.committee_index()), - Either::Right(att) => (att.data.slot, Some(att.committee_index)), - }) + .map(|att| (att.data.slot, Some(att.committee_index))) .collect::>(); // Gossip validate and publish attestations that can be immediately processed. diff --git a/beacon_node/http_api/tests/fork_tests.rs b/beacon_node/http_api/tests/fork_tests.rs index 10e1d01536..dcc6d13ec4 100644 --- a/beacon_node/http_api/tests/fork_tests.rs +++ b/beacon_node/http_api/tests/fork_tests.rs @@ -149,10 +149,41 @@ async fn attestations_across_fork_with_skip_slots() { .flat_map(|(atts, _)| atts.iter().map(|(att, _)| att.clone())) .collect::>(); + let unaggregated_attestations = unaggregated_attestations + .into_iter() + .map(|attn| { + let aggregation_bits = attn.get_aggregation_bits(); + + if aggregation_bits.len() != 1 { + panic!("Must be an unaggregated attestation") + } + + let aggregation_bit = *aggregation_bits.first().unwrap(); + + let committee = fork_state + .get_beacon_committee(attn.data().slot, attn.committee_index().unwrap()) + .unwrap(); + + let attester_index = committee + .committee + .iter() + .enumerate() + .find_map(|(i, &index)| { + if aggregation_bit as usize == i { + return Some(index); + } + None + }) + .unwrap(); + attn.to_single_attestation_with_attester_index(attester_index as u64) + .unwrap() + }) + .collect::>(); + assert!(!unaggregated_attestations.is_empty()); let fork_name = harness.spec.fork_name_at_slot::(fork_slot); client - .post_beacon_pool_attestations_v1(&unaggregated_attestations) + .post_beacon_pool_attestations_v2::(unaggregated_attestations, fork_name) .await .unwrap(); diff --git a/beacon_node/http_api/tests/interactive_tests.rs b/beacon_node/http_api/tests/interactive_tests.rs index 4f3cd6c828..399474c85c 100644 --- a/beacon_node/http_api/tests/interactive_tests.rs +++ b/beacon_node/http_api/tests/interactive_tests.rs @@ -5,7 +5,6 @@ use beacon_chain::{ ChainConfig, }; use beacon_processor::work_reprocessing_queue::ReprocessQueueMessage; -use either::Either; use eth2::types::ProduceBlockV3Response; use eth2::types::{DepositContractData, StateId}; use execution_layer::{ForkchoiceState, PayloadAttributes}; @@ -539,7 +538,7 @@ pub async fn proposer_boost_re_org_test( slot_a, num_parent_votes, ); - harness.process_attestations(block_a_parent_votes); + harness.process_attestations(block_a_parent_votes, &state_a); // Attest to block A during slot B. for _ in 0..parent_distance { @@ -553,7 +552,7 @@ pub async fn proposer_boost_re_org_test( slot_b, num_empty_votes, ); - harness.process_attestations(block_a_empty_votes); + harness.process_attestations(block_a_empty_votes, &state_a); let remaining_attesters = all_validators .iter() @@ -586,7 +585,7 @@ pub async fn proposer_boost_re_org_test( slot_b, num_head_votes, ); - harness.process_attestations(block_b_head_votes); + harness.process_attestations(block_b_head_votes, &state_b); let payload_lookahead = harness.chain.config.prepare_payload_lookahead; let fork_choice_lookahead = Duration::from_millis(500); @@ -818,10 +817,10 @@ pub async fn fork_choice_before_proposal() { block_root_c, slot_c, ); - harness.process_attestations(attestations_c); + harness.process_attestations(attestations_c, &state_c); // Apply the attestations to B, but don't re-run fork choice. - harness.process_attestations(attestations_b); + harness.process_attestations(attestations_b, &state_b); // Due to proposer boost, the head should be C during slot C. assert_eq!( @@ -894,7 +893,7 @@ async fn queue_attestations_from_http() { let fork_name = tester.harness.spec.fork_name_at_slot::(attestation_slot); // Make attestations to the block and POST them to the beacon node on a background thread. - let attestation_future = if fork_name.electra_enabled() { + let attestation_future = { let single_attestations = harness .make_single_attestations( &all_validators, @@ -907,30 +906,9 @@ async fn queue_attestations_from_http() { .flat_map(|attestations| attestations.into_iter().map(|(att, _subnet)| att)) .collect::>(); - let attestations = Either::Right(single_attestations); - tokio::spawn(async move { client - .post_beacon_pool_attestations_v2::(attestations, fork_name) - .await - .expect("attestations should be processed successfully") - }) - } else { - let attestations = harness - .make_unaggregated_attestations( - &all_validators, - &post_state, - block.0.state_root(), - block_root.into(), - attestation_slot, - ) - .into_iter() - .flat_map(|attestations| attestations.into_iter().map(|(att, _subnet)| att)) - .collect::>(); - - tokio::spawn(async move { - client - .post_beacon_pool_attestations_v1(&attestations) + .post_beacon_pool_attestations_v2::(single_attestations, fork_name) .await .expect("attestations should be processed successfully") }) diff --git a/beacon_node/http_api/tests/tests.rs b/beacon_node/http_api/tests/tests.rs index 4ad70c3467..c23ab92415 100644 --- a/beacon_node/http_api/tests/tests.rs +++ b/beacon_node/http_api/tests/tests.rs @@ -3,7 +3,6 @@ use beacon_chain::{ test_utils::{AttestationStrategy, BeaconChainHarness, BlockStrategy, EphemeralHarnessType}, BeaconChain, ChainConfig, StateSkipConfig, WhenSlotSkipped, }; -use either::Either; use eth2::{ mixin::{RequestAccept, ResponseForkName, ResponseOptional}, reqwest::RequestBuilder, @@ -1907,18 +1906,46 @@ impl ApiTester { } pub async fn test_post_beacon_pool_attestations_valid(mut self) -> Self { - self.client - .post_beacon_pool_attestations_v1(self.attestations.as_slice()) - .await - .unwrap(); - let fork_name = self .attestations .first() .map(|att| self.chain.spec.fork_name_at_slot::(att.data().slot)) .unwrap(); - let attestations = Either::Left(self.attestations.clone()); + let state = &self.chain.head_snapshot().beacon_state; + + let attestations = self + .attestations + .clone() + .into_iter() + .map(|attn| { + let aggregation_bits = attn.get_aggregation_bits(); + + if aggregation_bits.len() != 1 { + panic!("Must be an unaggregated attestation") + } + + let aggregation_bit = *aggregation_bits.first().unwrap(); + + let committee = state + .get_beacon_committee(attn.data().slot, attn.committee_index().unwrap()) + .unwrap(); + + let attester_index = committee + .committee + .iter() + .enumerate() + .find_map(|(i, &index)| { + if aggregation_bit as usize == i { + return Some(index); + } + None + }) + .unwrap(); + attn.to_single_attestation_with_attester_index(attester_index as u64) + .unwrap() + }) + .collect::>(); self.client .post_beacon_pool_attestations_v2::(attestations, fork_name) @@ -1943,9 +1970,8 @@ impl ApiTester { .map(|att| self.chain.spec.fork_name_at_slot::(att.data.slot)) .unwrap(); - let attestations = Either::Right(self.single_attestations.clone()); self.client - .post_beacon_pool_attestations_v2::(attestations, fork_name) + .post_beacon_pool_attestations_v2::(self.single_attestations.clone(), fork_name) .await .unwrap(); assert!( @@ -1958,18 +1984,87 @@ impl ApiTester { pub async fn test_post_beacon_pool_attestations_invalid_v1(mut self) -> Self { let mut attestations = Vec::new(); + let state = &self.chain.head_snapshot().beacon_state; for attestation in &self.attestations { let mut invalid_attestation = attestation.clone(); invalid_attestation.data_mut().slot += 1; + // Convert valid attestation into valid `SingleAttestation` + let aggregation_bits = attestation.get_aggregation_bits(); + + if aggregation_bits.len() != 1 { + panic!("Must be an unaggregated attestation") + } + + let aggregation_bit = *aggregation_bits.first().unwrap(); + + let committee = state + .get_beacon_committee( + attestation.data().slot, + attestation.committee_index().unwrap(), + ) + .unwrap(); + + let attester_index = committee + .committee + .iter() + .enumerate() + .find_map(|(i, &index)| { + if aggregation_bit as usize == i { + return Some(index); + } + None + }) + .unwrap(); + let attestation = attestation + .to_single_attestation_with_attester_index(attester_index as u64) + .unwrap(); + + // Convert invalid attestation to invalid `SingleAttestation` + let aggregation_bits = invalid_attestation.get_aggregation_bits(); + + if aggregation_bits.len() != 1 { + panic!("Must be an unaggregated attestation") + } + + let aggregation_bit = *aggregation_bits.first().unwrap(); + + let committee = state + .get_beacon_committee( + invalid_attestation.data().slot, + invalid_attestation.committee_index().unwrap(), + ) + .unwrap(); + + let attester_index = committee + .committee + .iter() + .enumerate() + .find_map(|(i, &index)| { + if aggregation_bit as usize == i { + return Some(index); + } + None + }) + .unwrap(); + let invalid_attestation = invalid_attestation + .to_single_attestation_with_attester_index(attester_index as u64) + .unwrap(); + // add both to ensure we only fail on invalid attestations attestations.push(attestation.clone()); attestations.push(invalid_attestation); } + let fork_name = self + .attestations + .first() + .map(|att| self.chain.spec.fork_name_at_slot::(att.data().slot)) + .unwrap(); + let err = self .client - .post_beacon_pool_attestations_v1(attestations.as_slice()) + .post_beacon_pool_attestations_v2::(attestations, fork_name) .await .unwrap_err(); @@ -2011,7 +2106,6 @@ impl ApiTester { .first() .map(|att| self.chain.spec.fork_name_at_slot::(att.data().slot)) .unwrap(); - let attestations = Either::Right(attestations); let err_v2 = self .client .post_beacon_pool_attestations_v2::(attestations, fork_name) @@ -4177,9 +4271,47 @@ impl ApiTester { assert_eq!(result, expected); + let attestations = self + .attestations + .clone() + .into_iter() + .map(|attn| { + let aggregation_bits = attn.get_aggregation_bits(); + + if aggregation_bits.len() != 1 { + panic!("Must be an unaggregated attestation") + } + + let aggregation_bit = *aggregation_bits.first().unwrap(); + + let committee = head_state + .get_beacon_committee(attn.data().slot, attn.committee_index().unwrap()) + .unwrap(); + + let attester_index = committee + .committee + .iter() + .enumerate() + .find_map(|(i, &index)| { + if aggregation_bit as usize == i { + return Some(index); + } + None + }) + .unwrap(); + attn.to_single_attestation_with_attester_index(attester_index as u64) + .unwrap() + }) + .collect::>(); + + let fork_name = self + .chain + .spec + .fork_name_at_slot::(attestations.first().unwrap().data.slot); + // Attest to the current slot self.client - .post_beacon_pool_attestations_v1(self.attestations.as_slice()) + .post_beacon_pool_attestations_v2::(attestations, fork_name) .await .unwrap(); @@ -5916,9 +6048,47 @@ impl ApiTester { assert_eq!(result, expected); + let attestations = self + .attestations + .clone() + .into_iter() + .map(|attn| { + let aggregation_bits = attn.get_aggregation_bits(); + + if aggregation_bits.len() != 1 { + panic!("Must be an unaggregated attestation") + } + + let aggregation_bit = *aggregation_bits.first().unwrap(); + + let committee = head_state + .get_beacon_committee(attn.data().slot, attn.committee_index().unwrap()) + .unwrap(); + + let attester_index = committee + .committee + .iter() + .enumerate() + .find_map(|(i, &index)| { + if aggregation_bit as usize == i { + return Some(index); + } + None + }) + .unwrap(); + attn.to_single_attestation_with_attester_index(attester_index as u64) + .unwrap() + }) + .collect::>(); + + let fork_name = self + .chain + .spec + .fork_name_at_slot::(attestations.first().unwrap().data.slot); + // Attest to the current slot self.client - .post_beacon_pool_attestations_v1(self.attestations.as_slice()) + .post_beacon_pool_attestations_v2::(attestations, fork_name) .await .unwrap(); @@ -5973,8 +6143,47 @@ impl ApiTester { let expected_attestation_len = self.attestations.len(); + let state = self.harness.get_current_state(); + let attestations = self + .attestations + .clone() + .into_iter() + .map(|attn| { + let aggregation_bits = attn.get_aggregation_bits(); + + if aggregation_bits.len() != 1 { + panic!("Must be an unaggregated attestation") + } + + let aggregation_bit = *aggregation_bits.first().unwrap(); + + let committee = state + .get_beacon_committee(attn.data().slot, attn.committee_index().unwrap()) + .unwrap(); + + let attester_index = committee + .committee + .iter() + .enumerate() + .find_map(|(i, &index)| { + if aggregation_bit as usize == i { + return Some(index); + } + None + }) + .unwrap(); + attn.to_single_attestation_with_attester_index(attester_index as u64) + .unwrap() + }) + .collect::>(); + + let fork_name = self + .chain + .spec + .fork_name_at_slot::(attestations.first().unwrap().data.slot); + self.client - .post_beacon_pool_attestations_v1(self.attestations.as_slice()) + .post_beacon_pool_attestations_v2::(attestations, fork_name) .await .unwrap(); @@ -6247,9 +6456,9 @@ impl ApiTester { .chain .spec .fork_name_at_slot::(self.chain.slot().unwrap()); - let attestations = Either::Right(self.single_attestations.clone()); + self.client - .post_beacon_pool_attestations_v2::(attestations, fork_name) + .post_beacon_pool_attestations_v2::(self.single_attestations.clone(), fork_name) .await .unwrap(); diff --git a/beacon_node/lighthouse_network/src/types/pubsub.rs b/beacon_node/lighthouse_network/src/types/pubsub.rs index 880b387250..21df75a648 100644 --- a/beacon_node/lighthouse_network/src/types/pubsub.rs +++ b/beacon_node/lighthouse_network/src/types/pubsub.rs @@ -7,8 +7,8 @@ use ssz::{Decode, Encode}; use std::io::{Error, ErrorKind}; use std::sync::Arc; use types::{ - Attestation, AttestationBase, AttesterSlashing, AttesterSlashingBase, AttesterSlashingElectra, - BlobSidecar, DataColumnSidecar, DataColumnSubnetId, EthSpec, ForkContext, ForkName, + AttesterSlashing, AttesterSlashingBase, AttesterSlashingElectra, BlobSidecar, + DataColumnSidecar, DataColumnSubnetId, EthSpec, ForkContext, ForkName, LightClientFinalityUpdate, LightClientOptimisticUpdate, ProposerSlashing, SignedAggregateAndProof, SignedAggregateAndProofBase, SignedAggregateAndProofElectra, SignedBeaconBlock, SignedBeaconBlockAltair, SignedBeaconBlockBase, SignedBeaconBlockBellatrix, @@ -27,10 +27,8 @@ pub enum PubsubMessage { DataColumnSidecar(Box<(DataColumnSubnetId, Arc>)>), /// Gossipsub message providing notification of a Aggregate attestation and associated proof. AggregateAndProofAttestation(Box>), - /// Gossipsub message providing notification of a raw un-aggregated attestation with its subnet id. - Attestation(Box<(SubnetId, Attestation)>), - /// Gossipsub message providing notification of a `SingleAttestation`` with its subnet id. - SingleAttestation(Box<(SubnetId, SingleAttestation)>), + /// Gossipsub message providing notification of a `SingleAttestation` with its subnet id. + Attestation(Box<(SubnetId, SingleAttestation)>), /// Gossipsub message providing notification of a voluntary exit. VoluntaryExit(Box), /// Gossipsub message providing notification of a new proposer slashing. @@ -140,9 +138,6 @@ impl PubsubMessage { PubsubMessage::Attestation(attestation_data) => { GossipKind::Attestation(attestation_data.0) } - PubsubMessage::SingleAttestation(attestation_data) => { - GossipKind::Attestation(attestation_data.0) - } PubsubMessage::VoluntaryExit(_) => GossipKind::VoluntaryExit, PubsubMessage::ProposerSlashing(_) => GossipKind::ProposerSlashing, PubsubMessage::AttesterSlashing(_) => GossipKind::AttesterSlashing, @@ -203,32 +198,12 @@ impl PubsubMessage { ))) } GossipKind::Attestation(subnet_id) => { - match fork_context.from_context_bytes(gossip_topic.fork_digest) { - Some(&fork_name) => { - if fork_name.electra_enabled() { - let single_attestation = - SingleAttestation::from_ssz_bytes(data) - .map_err(|e| format!("{:?}", e))?; - Ok(PubsubMessage::SingleAttestation(Box::new(( - *subnet_id, - single_attestation, - )))) - } else { - let attestation = Attestation::Base( - AttestationBase::from_ssz_bytes(data) - .map_err(|e| format!("{:?}", e))?, - ); - Ok(PubsubMessage::Attestation(Box::new(( - *subnet_id, - attestation, - )))) - } - } - None => Err(format!( - "Unknown gossipsub fork digest: {:?}", - gossip_topic.fork_digest - )), - } + let attestation = SingleAttestation::from_ssz_bytes(data) + .map_err(|e| format!("{:?}", e))?; + Ok(PubsubMessage::Attestation(Box::new(( + *subnet_id, + attestation, + )))) } GossipKind::BeaconBlock => { let beacon_block = @@ -418,7 +393,6 @@ impl PubsubMessage { PubsubMessage::ProposerSlashing(data) => data.as_ssz_bytes(), PubsubMessage::AttesterSlashing(data) => data.as_ssz_bytes(), PubsubMessage::Attestation(data) => data.1.as_ssz_bytes(), - PubsubMessage::SingleAttestation(data) => data.1.as_ssz_bytes(), PubsubMessage::SignedContributionAndProof(data) => data.as_ssz_bytes(), PubsubMessage::SyncCommitteeMessage(data) => data.1.as_ssz_bytes(), PubsubMessage::BlsToExecutionChange(data) => data.as_ssz_bytes(), @@ -457,13 +431,6 @@ impl std::fmt::Display for PubsubMessage { att.message().aggregator_index(), ), PubsubMessage::Attestation(data) => write!( - f, - "Attestation: subnet_id: {}, attestation_slot: {}, attestation_index: {:?}", - *data.0, - data.1.data().slot, - data.1.committee_index(), - ), - PubsubMessage::SingleAttestation(data) => write!( f, "SingleAttestation: subnet_id: {}, attestation_slot: {}, committee_index: {:?}, attester_index: {:?}", *data.0, diff --git a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs index d9aab07d5f..d1a75809a9 100644 --- a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs @@ -14,7 +14,6 @@ use beacon_chain::{ light_client_finality_update_verification::Error as LightClientFinalityUpdateError, light_client_optimistic_update_verification::Error as LightClientOptimisticUpdateError, observed_operations::ObservationOutcome, - single_attestation::single_attestation_to_attestation, sync_committee_verification::{self, Error as SyncCommitteeError}, validator_monitor::{get_block_delay_ms, get_slot_delay_ms}, AvailabilityProcessingStatus, BeaconChainError, BeaconChainTypes, BlockError, ForkChoiceError, @@ -84,8 +83,8 @@ impl VerifiedAttestation for VerifiedUnaggregate { } /// An attestation that failed validation by the `BeaconChain`. -struct RejectedUnaggregate { - attestation: Box>, +struct RejectedUnaggregate { + attestation: Box, error: AttnError, } @@ -126,16 +125,11 @@ struct RejectedAggregate { /// Data for an aggregated or unaggregated attestation that failed verification. enum FailedAtt { Unaggregate { - attestation: Box>, + attestation: Box, subnet_id: SubnetId, should_import: bool, seen_timestamp: Duration, }, - // This variant is just a dummy variant for now, as SingleAttestation reprocessing is handled - // separately. - SingleUnaggregate { - attestation: Box, - }, Aggregate { attestation: Box>, seen_timestamp: Duration, @@ -150,15 +144,13 @@ impl FailedAtt { pub fn kind(&self) -> &'static str { match self { FailedAtt::Unaggregate { .. } => "unaggregated", - FailedAtt::SingleUnaggregate { .. } => "unaggregated", FailedAtt::Aggregate { .. } => "aggregated", } } pub fn attestation_data(&self) -> &AttestationData { match self { - FailedAtt::Unaggregate { attestation, .. } => attestation.data(), - FailedAtt::SingleUnaggregate { attestation, .. } => &attestation.data, + FailedAtt::Unaggregate { attestation, .. } => &attestation.data, FailedAtt::Aggregate { attestation, .. } => attestation.message().aggregate().data(), } } @@ -210,7 +202,7 @@ impl NetworkBeaconProcessor { self: Arc, message_id: MessageId, peer_id: PeerId, - attestation: Box>, + attestation: Box, subnet_id: SubnetId, should_import: bool, reprocess_tx: Option>, @@ -220,10 +212,14 @@ impl NetworkBeaconProcessor { .chain .verify_unaggregated_attestation_for_gossip(&attestation, Some(subnet_id)) { - Ok(verified_attestation) => Ok(VerifiedUnaggregate { - indexed_attestation: verified_attestation.into_indexed_attestation(), - attestation, - }), + Ok(verified_attestation) => { + let attestation = + Box::new(verified_attestation.attestation().clone_as_attestation()); + Ok(VerifiedUnaggregate { + indexed_attestation: verified_attestation.into_indexed_attestation(), + attestation, + }) + } Err(error) => Err(RejectedUnaggregate { attestation, error }), }; @@ -240,7 +236,7 @@ impl NetworkBeaconProcessor { pub fn process_gossip_attestation_batch( self: Arc, - packages: GossipAttestationBatch, + packages: GossipAttestationBatch, reprocess_tx: Option>, ) { let attestations_and_subnets = packages @@ -277,14 +273,19 @@ impl NetworkBeaconProcessor { #[allow(clippy::needless_collect)] // The clippy suggestion fails the borrow checker. let results = results .into_iter() - .map(|result| result.map(|verified| verified.into_indexed_attestation())) + .map(|result| { + result.map(|verified| { + let attestation = verified.attestation().clone_as_attestation(); + (verified.into_indexed_attestation(), attestation) + }) + }) .collect::>(); for (result, package) in results.into_iter().zip(packages.into_iter()) { let result = match result { - Ok(indexed_attestation) => Ok(VerifiedUnaggregate { + Ok((indexed_attestation, attestation)) => Ok(VerifiedUnaggregate { indexed_attestation, - attestation: package.attestation, + attestation: Box::new(attestation), }), Err(error) => Err(RejectedUnaggregate { attestation: package.attestation, @@ -309,7 +310,7 @@ impl NetworkBeaconProcessor { #[allow(clippy::too_many_arguments)] fn process_gossip_attestation_result( self: &Arc, - result: Result, RejectedUnaggregate>, + result: Result, RejectedUnaggregate>, message_id: MessageId, peer_id: PeerId, subnet_id: SubnetId, @@ -405,147 +406,6 @@ impl NetworkBeaconProcessor { } } - /// Process an unaggregated attestation requiring conversion. - /// - /// This function performs the conversion, and if successfull queues a new message to be - /// processed by `process_gossip_attestation`. If unsuccessful due to block unavailability, - /// a retry message will be pushed to the `reprocess_tx` if it is `Some`. - #[allow(clippy::too_many_arguments)] - pub fn process_gossip_attestation_to_convert( - self: Arc, - message_id: MessageId, - peer_id: PeerId, - single_attestation: Box, - subnet_id: SubnetId, - should_import: bool, - reprocess_tx: Option>, - seen_timestamp: Duration, - ) { - let conversion_result = self.chain.with_committee_cache( - single_attestation.data.target.root, - single_attestation - .data - .slot - .epoch(T::EthSpec::slots_per_epoch()), - |committee_cache, _| { - let slot = single_attestation.data.slot; - let committee_index = single_attestation.committee_index; - let Some(committee) = committee_cache.get_beacon_committee(slot, committee_index) - else { - return Ok(Err(AttnError::NoCommitteeForSlotAndIndex { - slot, - index: committee_index, - })); - }; - - Ok(single_attestation_to_attestation( - &single_attestation, - committee.committee, - )) - }, - ); - - match conversion_result { - Ok(Ok(attestation)) => { - let slot = attestation.data().slot; - if let Err(e) = self.send_unaggregated_attestation( - message_id.clone(), - peer_id, - attestation, - subnet_id, - should_import, - seen_timestamp, - ) { - error!( - error = %e, - %slot, - "Unable to queue converted SingleAttestation" - ); - self.propagate_validation_result( - message_id, - peer_id, - MessageAcceptance::Ignore, - ); - } - } - // Outermost error (from `with_committee_cache`) indicating that the block is not known - // and that this conversion should be retried. - Err(BeaconChainError::MissingBeaconBlock(beacon_block_root)) => { - if let Some(sender) = reprocess_tx { - metrics::inc_counter( - &metrics::BEACON_PROCESSOR_UNAGGREGATED_ATTESTATION_REQUEUED_TOTAL, - ); - // We don't know the block, get the sync manager to handle the block lookup, and - // send the attestation to be scheduled for re-processing. - self.sync_tx - .send(SyncMessage::UnknownBlockHashFromAttestation( - peer_id, - beacon_block_root, - )) - .unwrap_or_else(|_| { - warn!(msg = "UnknownBlockHash", "Failed to send to sync service") - }); - let processor = self.clone(); - // Do not allow this attestation to be re-processed beyond this point. - let reprocess_msg = - ReprocessQueueMessage::UnknownBlockUnaggregate(QueuedUnaggregate { - beacon_block_root, - process_fn: Box::new(move || { - processor.process_gossip_attestation_to_convert( - message_id, - peer_id, - single_attestation, - subnet_id, - should_import, - None, - seen_timestamp, - ) - }), - }); - if sender.try_send(reprocess_msg).is_err() { - error!("Failed to send attestation for re-processing") - } - } else { - // We shouldn't make any further attempts to process this attestation. - // - // Don't downscore the peer since it's not clear if we requested this head - // block from them or not. - self.propagate_validation_result( - message_id, - peer_id, - MessageAcceptance::Ignore, - ); - } - } - Ok(Err(error)) => { - // We already handled reprocessing above so do not attempt it in the error handler. - self.handle_attestation_verification_failure( - peer_id, - message_id, - FailedAtt::SingleUnaggregate { - attestation: single_attestation, - }, - None, - error, - seen_timestamp, - ); - } - Err(error) => { - // We already handled reprocessing above so do not attempt it in the error handler. - self.handle_attestation_verification_failure( - peer_id, - message_id, - FailedAtt::SingleUnaggregate { - attestation: single_attestation, - }, - None, - AttnError::BeaconChainError(Box::new(error)), - seen_timestamp, - ); - } - } - } - /// Process the aggregated attestation received from the gossip network and: /// /// - If it passes gossip propagation criteria, tell the network thread to forward it. @@ -2530,16 +2390,6 @@ impl NetworkBeaconProcessor { }), }) } - FailedAtt::SingleUnaggregate { .. } => { - // This should never happen, as we handle the unknown head block case - // for `SingleAttestation`s separately and should not be able to hit - // an `UnknownHeadBlock` error. - error!( - block_root = ?beacon_block_root, - "Dropping SingleAttestation instead of requeueing" - ); - return; - } FailedAtt::Unaggregate { attestation, subnet_id, @@ -2635,19 +2485,6 @@ impl NetworkBeaconProcessor { "attn_no_committee", ); } - AttnError::NotExactlyOneAggregationBitSet(_) => { - /* - * The unaggregated attestation doesn't have only one signature. - * - * The peer has published an invalid consensus message. - */ - self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Reject); - self.gossip_penalize_peer( - peer_id, - PeerAction::LowToleranceError, - "attn_too_many_agg_bits", - ); - } AttnError::NotExactlyOneCommitteeBitSet(_) => { /* * The attestation doesn't have only one committee bit set. diff --git a/beacon_node/network/src/network_beacon_processor/mod.rs b/beacon_node/network/src/network_beacon_processor/mod.rs index 0b89058ba9..311c09294b 100644 --- a/beacon_node/network/src/network_beacon_processor/mod.rs +++ b/beacon_node/network/src/network_beacon_processor/mod.rs @@ -75,20 +75,21 @@ impl NetworkBeaconProcessor { self.beacon_processor_send.try_send(event) } - /// Create a new `Work` event for some `SingleAttestation`. - pub fn send_single_attestation( + /// Create a new `Work` event for some unaggregated attestation. + pub fn send_unaggregated_attestation( self: &Arc, message_id: MessageId, peer_id: PeerId, - single_attestation: SingleAttestation, + attestation: SingleAttestation, subnet_id: SubnetId, should_import: bool, seen_timestamp: Duration, ) -> Result<(), Error> { + // Define a closure for processing individual attestations. let processor = self.clone(); let process_individual = move |package: GossipAttestationPackage| { let reprocess_tx = processor.reprocess_tx.clone(); - processor.process_gossip_attestation_to_convert( + processor.process_gossip_attestation( package.message_id, package.peer_id, package.attestation, @@ -99,48 +100,6 @@ impl NetworkBeaconProcessor { ) }; - self.try_send(BeaconWorkEvent { - drop_during_sync: true, - work: Work::GossipAttestationToConvert { - attestation: Box::new(GossipAttestationPackage { - message_id, - peer_id, - attestation: Box::new(single_attestation), - subnet_id, - should_import, - seen_timestamp, - }), - process_individual: Box::new(process_individual), - }, - }) - } - - /// Create a new `Work` event for some unaggregated attestation. - pub fn send_unaggregated_attestation( - self: &Arc, - message_id: MessageId, - peer_id: PeerId, - attestation: Attestation, - subnet_id: SubnetId, - should_import: bool, - seen_timestamp: Duration, - ) -> Result<(), Error> { - // Define a closure for processing individual attestations. - let processor = self.clone(); - let process_individual = - move |package: GossipAttestationPackage>| { - let reprocess_tx = processor.reprocess_tx.clone(); - processor.process_gossip_attestation( - package.message_id, - package.peer_id, - package.attestation, - package.subnet_id, - package.should_import, - Some(reprocess_tx), - package.seen_timestamp, - ) - }; - // Define a closure for processing batches of attestations. let processor = self.clone(); let process_batch = move |attestations| { diff --git a/beacon_node/network/src/network_beacon_processor/tests.rs b/beacon_node/network/src/network_beacon_processor/tests.rs index dbaa92ce08..4c107cfc87 100644 --- a/beacon_node/network/src/network_beacon_processor/tests.rs +++ b/beacon_node/network/src/network_beacon_processor/tests.rs @@ -36,9 +36,9 @@ use std::time::Duration; use tokio::sync::mpsc; use types::blob_sidecar::FixedBlobSidecarList; use types::{ - Attestation, AttesterSlashing, BlobSidecar, BlobSidecarList, ChainSpec, DataColumnSidecarList, + AttesterSlashing, BlobSidecar, BlobSidecarList, ChainSpec, DataColumnSidecarList, DataColumnSubnetId, Epoch, Hash256, MainnetEthSpec, ProposerSlashing, SignedAggregateAndProof, - SignedBeaconBlock, SignedVoluntaryExit, Slot, SubnetId, + SignedBeaconBlock, SignedVoluntaryExit, SingleAttestation, Slot, SubnetId, }; type E = MainnetEthSpec; @@ -60,8 +60,8 @@ struct TestRig { next_block: Arc>, next_blobs: Option>, next_data_columns: Option>, - attestations: Vec<(Attestation, SubnetId)>, - next_block_attestations: Vec<(Attestation, SubnetId)>, + attestations: Vec<(SingleAttestation, SubnetId)>, + next_block_attestations: Vec<(SingleAttestation, SubnetId)>, next_block_aggregate_attestations: Vec>, attester_slashing: AttesterSlashing, proposer_slashing: ProposerSlashing, @@ -143,7 +143,7 @@ impl TestRig { let head_state_root = head.beacon_state_root(); let attestations = harness - .get_unaggregated_attestations( + .get_single_attestations( &AttestationStrategy::AllValidators, &head.beacon_state, head_state_root, @@ -160,7 +160,7 @@ impl TestRig { ); let next_block_attestations = harness - .get_unaggregated_attestations( + .get_single_attestations( &AttestationStrategy::AllValidators, &next_state, next_block_tuple.0.state_root(), diff --git a/beacon_node/network/src/router.rs b/beacon_node/network/src/router.rs index 2a7bc597c2..960a1203a6 100644 --- a/beacon_node/network/src/router.rs +++ b/beacon_node/network/src/router.rs @@ -354,17 +354,6 @@ impl Router { timestamp_now(), ), ), - PubsubMessage::SingleAttestation(subnet_attestation) => self - .handle_beacon_processor_send_result( - self.network_beacon_processor.send_single_attestation( - message_id, - peer_id, - subnet_attestation.1, - subnet_attestation.0, - should_process, - timestamp_now(), - ), - ), PubsubMessage::BeaconBlock(block) => self.handle_beacon_processor_send_result( self.network_beacon_processor.send_gossip_beacon_block( message_id, diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index ba96ce4731..b120d67007 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -554,23 +554,7 @@ impl NetworkService { // the attestation, else we just just propagate the Attestation. let should_process = self.subnet_service.should_process_attestation( Subnet::Attestation(subnet_id), - attestation.data(), - ); - self.send_to_router(RouterMessage::PubsubMessage( - id, - source, - message, - should_process, - )); - } - PubsubMessage::SingleAttestation(ref subnet_and_attestation) => { - let subnet_id = subnet_and_attestation.0; - let single_attestation = &subnet_and_attestation.1; - // checks if we have an aggregator for the slot. If so, we should process - // the attestation, else we just just propagate the Attestation. - let should_process = self.subnet_service.should_process_attestation( - Subnet::Attestation(subnet_id), - &single_attestation.data, + &attestation.data, ); self.send_to_router(RouterMessage::PubsubMessage( id, diff --git a/common/eth2/src/lib.rs b/common/eth2/src/lib.rs index 52cc91ba29..1dd2970c10 100644 --- a/common/eth2/src/lib.rs +++ b/common/eth2/src/lib.rs @@ -18,7 +18,6 @@ use self::mixin::{RequestAccept, ResponseOptional}; use self::types::{Error as ResponseError, *}; use ::types::beacon_response::ExecutionOptimisticFinalizedBeaconResponse; use derivative::Derivative; -use either::Either; use futures::Stream; use futures_util::StreamExt; use libp2p_identity::PeerId; @@ -1434,29 +1433,10 @@ impl BeaconNodeHttpClient { .map(|opt| opt.map(BeaconResponse::ForkVersioned)) } - /// `POST v1/beacon/pool/attestations` - pub async fn post_beacon_pool_attestations_v1( - &self, - attestations: &[Attestation], - ) -> Result<(), Error> { - let mut path = self.eth_path(V1)?; - - path.path_segments_mut() - .map_err(|()| Error::InvalidUrl(self.server.clone()))? - .push("beacon") - .push("pool") - .push("attestations"); - - self.post_with_timeout(path, &attestations, self.timeouts.attestation) - .await?; - - Ok(()) - } - /// `POST v2/beacon/pool/attestations` pub async fn post_beacon_pool_attestations_v2( &self, - attestations: Either>, Vec>, + attestations: Vec, fork_name: ForkName, ) -> Result<(), Error> { let mut path = self.eth_path(V2)?; @@ -1467,26 +1447,13 @@ impl BeaconNodeHttpClient { .push("pool") .push("attestations"); - match attestations { - Either::Right(attestations) => { - self.post_with_timeout_and_consensus_header( - path, - &attestations, - self.timeouts.attestation, - fork_name, - ) - .await?; - } - Either::Left(attestations) => { - self.post_with_timeout_and_consensus_header( - path, - &attestations, - self.timeouts.attestation, - fork_name, - ) - .await?; - } - }; + self.post_with_timeout_and_consensus_header( + path, + &attestations, + self.timeouts.attestation, + fork_name, + ) + .await?; Ok(()) } diff --git a/consensus/fork_choice/tests/tests.rs b/consensus/fork_choice/tests/tests.rs index 95bdee574d..8d510d0e89 100644 --- a/consensus/fork_choice/tests/tests.rs +++ b/consensus/fork_choice/tests/tests.rs @@ -15,6 +15,7 @@ use std::fmt; use std::sync::Mutex; use std::time::Duration; use store::MemoryStore; +use types::SingleAttestation; use types::{ test_utils::generate_deterministic_keypair, BeaconBlockRef, BeaconState, ChainSpec, Checkpoint, Epoch, EthSpec, FixedBytesExtended, ForkName, Hash256, IndexedAttestation, MainnetEthSpec, @@ -463,10 +464,17 @@ impl ForkChoiceTest { ) .expect("should sign attestation"); + let single_attestation = SingleAttestation { + attester_index: validator_index as u64, + committee_index: validator_committee_index as u64, + data: attestation.data().clone(), + signature: attestation.signature().clone(), + }; + let mut verified_attestation = self .harness .chain - .verify_unaggregated_attestation_for_gossip(&attestation, Some(subnet_id)) + .verify_unaggregated_attestation_for_gossip(&single_attestation, Some(subnet_id)) .expect("precondition: should gossip verify attestation"); if let MutationDelay::Blocks(slots) = delay { diff --git a/consensus/types/src/attestation.rs b/consensus/types/src/attestation.rs index 286e4622f8..e9a1ab4ceb 100644 --- a/consensus/types/src/attestation.rs +++ b/consensus/types/src/attestation.rs @@ -1,7 +1,13 @@ -use crate::context_deserialize; +use super::{ + AggregateSignature, AttestationData, BitList, ChainSpec, Domain, EthSpec, Fork, SecretKey, + Signature, SignedRoot, +}; use crate::slot_data::SlotData; +use crate::{context_deserialize, IndexedAttestation}; use crate::{test_utils::TestRandom, Hash256, Slot}; -use crate::{Checkpoint, ContextDeserialize, ForkName}; +use crate::{ + Checkpoint, ContextDeserialize, ForkName, IndexedAttestationBase, IndexedAttestationElectra, +}; use derivative::Derivative; use serde::{Deserialize, Deserializer, Serialize}; use ssz_derive::{Decode, Encode}; @@ -12,11 +18,6 @@ use superstruct::superstruct; use test_random_derive::TestRandom; use tree_hash_derive::TreeHash; -use super::{ - AggregateSignature, AttestationData, BitList, ChainSpec, Domain, EthSpec, Fork, SecretKey, - Signature, SignedRoot, -}; - #[derive(Debug, PartialEq, Clone)] pub enum Error { SszTypesError(ssz_types::Error), @@ -246,10 +247,17 @@ impl Attestation { attester_index: u64, ) -> Result { match self { - Self::Base(_) => Err(Error::IncorrectStateVariant), + Self::Base(attn) => attn.to_single_attestation_with_attester_index(attester_index), Self::Electra(attn) => attn.to_single_attestation_with_attester_index(attester_index), } } + + pub fn get_aggregation_bits(&self) -> Vec { + match self { + Self::Base(attn) => attn.get_aggregation_bits(), + Self::Electra(attn) => attn.get_aggregation_bits(), + } + } } impl AttestationRef<'_, E> { @@ -461,6 +469,26 @@ impl AttestationBase { ) -> Result, ssz::BitfieldError> { self.aggregation_bits.resize::() } + + pub fn get_aggregation_bits(&self) -> Vec { + self.aggregation_bits + .iter() + .enumerate() + .filter_map(|(index, bit)| if bit { Some(index as u64) } else { None }) + .collect() + } + + pub fn to_single_attestation_with_attester_index( + &self, + attester_index: u64, + ) -> Result { + Ok(SingleAttestation { + committee_index: self.data.index, + attester_index, + data: self.data.clone(), + signature: self.signature.clone(), + }) + } } impl SlotData for Attestation { @@ -596,6 +624,24 @@ pub struct SingleAttestation { pub signature: AggregateSignature, } +impl SingleAttestation { + pub fn to_indexed(&self, fork_name: ForkName) -> IndexedAttestation { + if fork_name.electra_enabled() { + IndexedAttestation::Electra(IndexedAttestationElectra { + attesting_indices: vec![self.attester_index].into(), + data: self.data.clone(), + signature: self.signature.clone(), + }) + } else { + IndexedAttestation::Base(IndexedAttestationBase { + attesting_indices: vec![self.attester_index].into(), + data: self.data.clone(), + signature: self.signature.clone(), + }) + } + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/validator_client/validator_services/src/attestation_service.rs b/validator_client/validator_services/src/attestation_service.rs index f776567706..e4063cd211 100644 --- a/validator_client/validator_services/src/attestation_service.rs +++ b/validator_client/validator_services/src/attestation_service.rs @@ -1,6 +1,5 @@ use crate::duties_service::{DutiesService, DutyAndProof}; use beacon_node_fallback::{ApiTopic, BeaconNodeFallback}; -use either::Either; use futures::future::join_all; use logging::crit; use slot_clock::SlotClock; @@ -461,40 +460,32 @@ impl AttestationService Some(a), - Err(e) => { - // This shouldn't happen unless BN and VC are out of sync with - // respect to the Electra fork. - error!( - error = ?e, - committee_index = attestation_data.index, - slot = slot.as_u64(), - "type" = "unaggregated", - "Unable to convert to SingleAttestation" - ); - None - } - } - }) - .collect::>(); - beacon_node - .post_beacon_pool_attestations_v2::( - Either::Right(single_attestations), - fork_name, - ) - .await - } else { - beacon_node - .post_beacon_pool_attestations_v1(attestations) - .await - } + let single_attestations = attestations + .iter() + .zip(validator_indices) + .filter_map(|(a, i)| { + match a.to_single_attestation_with_attester_index(*i) { + Ok(a) => Some(a), + Err(e) => { + // This shouldn't happen unless BN and VC are out of sync with + // respect to the Electra fork. + error!( + error = ?e, + committee_index = attestation_data.index, + slot = slot.as_u64(), + "type" = "unaggregated", + "Unable to convert to SingleAttestation" + ); + None + } + } + }) + .collect::>(); + + beacon_node + .post_beacon_pool_attestations_v2::(single_attestations, fork_name) + .await }) .await {