diff --git a/beacon_node/beacon_chain/src/test_utils.rs b/beacon_node/beacon_chain/src/test_utils.rs index 093ee0c44b..072d671284 100644 --- a/beacon_node/beacon_chain/src/test_utils.rs +++ b/beacon_node/beacon_chain/src/test_utils.rs @@ -16,6 +16,7 @@ use crate::{ BeaconChain, BeaconChainTypes, BlockError, ChainConfig, ServerSentEventHandler, StateSkipConfig, }; +use attestation::SingleAttestation; use bls::get_withdrawal_credentials; use eth2::types::SignedBlockContentsTuple; use execution_layer::test_utils::generate_genesis_header; @@ -662,10 +663,14 @@ pub struct BeaconChainHarness { pub rng: Mutex, } +pub type CommitteeSingleAttestations = Vec<(SingleAttestation, SubnetId)>; pub type CommitteeAttestations = Vec<(Attestation, SubnetId)>; pub type HarnessAttestations = Vec<(CommitteeAttestations, Option>)>; +pub type HarnessSingleAttestations = + Vec<(CommitteeSingleAttestations, Option>)>; + pub type HarnessSyncContributions = Vec<( Vec<(SyncCommitteeMessage, usize)>, Option>, @@ -1023,6 +1028,79 @@ where ) } + pub fn produce_single_attestation_for_block( + &self, + slot: Slot, + index: CommitteeIndex, + beacon_block_root: Hash256, + mut state: Cow>, + state_root: Hash256, + aggregation_bit_index: usize, + validator_index: usize, + ) -> Result { + + let epoch = slot.epoch(E::slots_per_epoch()); + + if state.slot() > slot { + return Err(BeaconChainError::CannotAttestToFutureState); + } else if state.current_epoch() < epoch { + let mut_state = state.to_mut(); + complete_state_advance( + mut_state, + Some(state_root), + epoch.start_slot(E::slots_per_epoch()), + &self.spec, + )?; + mut_state.build_committee_cache(RelativeEpoch::Current, &self.spec)?; + } + + let committee_len = state.get_beacon_committee(slot, index)?.committee.len(); + + let target_slot = epoch.start_slot(E::slots_per_epoch()); + let target_root = if state.slot() <= target_slot { + beacon_block_root + } else { + *state.get_block_root(target_slot)? + }; + + let mut attestation: Attestation = Attestation::empty_for_signing( + index, + committee_len, + slot, + beacon_block_root, + state.current_justified_checkpoint(), + Checkpoint { + epoch, + root: target_root, + }, + &self.spec, + )?; + + let attestation = match attestation { + Attestation::Electra(mut attn) => { + attn.aggregation_bits.set(aggregation_bit_index, true).unwrap(); + attn + }, + Attestation::Base(_) => panic!("MUST BE AN ELECTRA ATTESTATION"), + }; + + let committee = state.get_beacon_committee(slot, index)?; + + // let committees = state.get_beacon_committees_at_epoch(RelativeEpoch::Current)?; + + let single_attestation = attestation.to_single_attestation( + &vec![committee.clone()] + )?; + + let attestation: Attestation = single_attestation.to_attestation(&vec![committee])?; + + assert_eq!(single_attestation.committee_index, attestation.committee_index().unwrap() as usize); + assert_eq!(single_attestation.attester_index, validator_index); + // assert_eq!(single_attestation.attester_index, attestation.attester_index()); + Ok(single_attestation) + } + + /// Produces an "unaggregated" attestation for the given `slot` and `index` that attests to /// `beacon_block_root`. The provided `state` should match the `block.state_root` for the /// `block` identified by `beacon_block_root`. @@ -1080,6 +1158,33 @@ where )?) } + /// A list of attestations for each committee for the given slot. + /// + /// The first layer of the Vec is organised per committee. For example, if the return value is + /// called `all_attestations`, then all attestations in `all_attestations[0]` will be for + /// committee 0, whilst all in `all_attestations[1]` will be for committee 1. + pub fn make_single_attestations( + &self, + attesting_validators: &[usize], + state: &BeaconState, + state_root: Hash256, + head_block_root: SignedBeaconBlockHash, + attestation_slot: Slot, + ) -> Vec { + let fork = self + .spec + .fork_at_epoch(attestation_slot.epoch(E::slots_per_epoch())); + self.make_single_attestations_with_opts( + attesting_validators, + state, + state_root, + head_block_root, + attestation_slot, + MakeAttestationOptions { limit: None, fork }, + ) + .0 + } + /// A list of attestations for each committee for the given slot. /// /// The first layer of the Vec is organised per committee. For example, if the return value is @@ -1107,6 +1212,99 @@ where .0 } + pub fn make_single_attestations_with_opts( + &self, + attesting_validators: &[usize], + state: &BeaconState, + state_root: Hash256, + head_block_root: SignedBeaconBlockHash, + attestation_slot: Slot, + opts: MakeAttestationOptions, + ) -> (Vec, Vec) { + let MakeAttestationOptions { limit, fork } = opts; + let committee_count = state.get_committee_count_at_slot(state.slot()).unwrap(); + let num_attesters = AtomicUsize::new(0); + + let (attestations, split_attesters) = state + .get_beacon_committees_at_slot(attestation_slot) + .expect("should get committees") + .iter() + .map(|bc| { + bc.committee + .par_iter() + .enumerate() + .filter_map(|(i, validator_index)| { + if !attesting_validators.contains(validator_index) { + return None; + } + + if let Some(limit) = limit { + // This atomics stuff is necessary because we're under a par_iter, + // and Rayon will deadlock if we use a mutex. + if num_attesters.fetch_add(1, Ordering::Relaxed) >= limit { + num_attesters.fetch_sub(1, Ordering::Relaxed); + return None; + } + } + + let mut attestation = self + .produce_single_attestation_for_block( + attestation_slot, + bc.index, + head_block_root.into(), + Cow::Borrowed(state), + state_root, + i, + *validator_index + ) + .unwrap(); + + attestation.signature = { + let domain = self.spec.get_domain( + attestation.data.target.epoch, + Domain::BeaconAttester, + &fork, + state.genesis_validators_root(), + ); + + let message = attestation.data.signing_root(domain); + + let mut agg_sig = AggregateSignature::infinity(); + + agg_sig.add_assign( + &self.validator_keypairs[*validator_index].sk.sign(message), + ); + + agg_sig + }; + + let subnet_id = SubnetId::compute_subnet_for_single_attestation::( + attestation.clone(), + committee_count, + &self.chain.spec, + ) + .unwrap(); + + Some(((attestation, subnet_id), validator_index)) + }) + .unzip::<_, _, Vec<_>, Vec<_>>() + }) + .unzip::<_, _, Vec<_>, Vec<_>>(); + + // Flatten attesters. + let attesters = split_attesters.into_iter().flatten().collect::>(); + + if let Some(limit) = limit { + assert_eq!(limit, num_attesters.load(Ordering::Relaxed)); + assert_eq!( + limit, + attesters.len(), + "failed to generate `limit` attestations" + ); + } + (attestations, attesters) + } + pub fn make_unaggregated_attestations_with_opts( &self, attesting_validators: &[usize], @@ -1287,6 +1485,32 @@ where ) } + /// A list of attestations for each committee for the given slot. + /// + /// The first layer of the Vec is organised per committee. For example, if the return value is + /// called `all_attestations`, then all attestations in `all_attestations[0]` will be for + /// committee 0, whilst all in `all_attestations[1]` will be for committee 1. + pub fn get_single_attestations( + &self, + attestation_strategy: &AttestationStrategy, + state: &BeaconState, + state_root: Hash256, + head_block_root: Hash256, + attestation_slot: Slot, + ) -> Vec> { + let validators: Vec = match attestation_strategy { + AttestationStrategy::AllValidators => self.get_all_validators(), + AttestationStrategy::SomeValidators(vals) => vals.clone(), + }; + self.make_single_attestations( + &validators, + state, + state_root, + head_block_root.into(), + attestation_slot, + ) + } + pub fn make_attestations( &self, attesting_validators: &[usize], diff --git a/beacon_node/http_api/tests/fork_tests.rs b/beacon_node/http_api/tests/fork_tests.rs index 8cb6053e9f..52b2e12619 100644 --- a/beacon_node/http_api/tests/fork_tests.rs +++ b/beacon_node/http_api/tests/fork_tests.rs @@ -155,10 +155,10 @@ async fn attestations_across_fork_with_skip_slots() { .post_beacon_pool_attestations_v1(&unaggregated_attestations) .await .unwrap(); - client - .post_beacon_pool_attestations_v2(&unaggregated_attestations, fork_name) - .await - .unwrap(); + // client + // .post_beacon_pool_attestations_v2(&unaggregated_attestations, fork_name) + // .await + // .unwrap(); let signed_aggregates = attestations .into_iter() diff --git a/beacon_node/http_api/tests/interactive_tests.rs b/beacon_node/http_api/tests/interactive_tests.rs index c3ed334782..ee7c1b9825 100644 --- a/beacon_node/http_api/tests/interactive_tests.rs +++ b/beacon_node/http_api/tests/interactive_tests.rs @@ -853,6 +853,53 @@ pub async fn fork_choice_before_proposal() { assert_eq!(block_d.parent_root(), Hash256::from(block_root_b)); } +// Test that attestations to unknown blocks are requeued and processed when their block arrives. +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn valid_single_attestation() { + let validator_count = 128; + let all_validators = (0..validator_count).collect::>(); + + let tester = InteractiveTester::::new(None, validator_count).await; + let harness = &tester.harness; + let client = tester.client.clone(); + + let num_initial = 5; + + // Slot of the block attested to. + let attestation_slot = Slot::new(num_initial) + 1; + + // Make some initial blocks. + harness.advance_slot(); + harness + .extend_chain( + num_initial as usize, + BlockStrategy::OnCanonicalHead, + AttestationStrategy::AllValidators, + ) + .await; + + harness.advance_slot(); + assert_eq!(harness.get_current_slot(), attestation_slot); + + // Make the attested-to block without applying it. + let pre_state = harness.get_current_state(); + let (block, post_state) = harness.make_block(pre_state, attestation_slot).await; + let block_root = block.0.canonical_root(); + + // Make attestations to the block and POST them to the beacon node on a background thread. + let attestations = harness + .make_single_attestations( + &all_validators, + &post_state, + block.0.state_root(), + block_root.into(), + attestation_slot, + ) + .into_iter() + .flat_map(|attestations| attestations.into_iter().map(|(att, _subnet)| att)) + .collect::>(); +} + // Test that attestations to unknown blocks are requeued and processed when their block arrives. #[tokio::test(flavor = "multi_thread", worker_threads = 4)] async fn queue_attestations_from_http() { @@ -902,7 +949,7 @@ async fn queue_attestations_from_http() { let fork_name = tester.harness.spec.fork_name_at_slot::(attestation_slot); let attestation_future = tokio::spawn(async move { client - .post_beacon_pool_attestations_v2(&attestations, fork_name) + .post_beacon_pool_attestations_v1(&attestations) .await .expect("attestations should be processed successfully") }); diff --git a/beacon_node/http_api/tests/tests.rs b/beacon_node/http_api/tests/tests.rs index 940f3ae9c0..cc08124e4d 100644 --- a/beacon_node/http_api/tests/tests.rs +++ b/beacon_node/http_api/tests/tests.rs @@ -1810,7 +1810,7 @@ impl ApiTester { .map(|att| self.chain.spec.fork_name_at_slot::(att.data().slot)) .unwrap(); self.client - .post_beacon_pool_attestations_v2(self.attestations.as_slice(), fork_name) + .post_beacon_pool_attestations_v1(self.attestations.as_slice()) .await .unwrap(); assert!( @@ -1876,7 +1876,7 @@ impl ApiTester { let err_v2 = self .client - .post_beacon_pool_attestations_v2(attestations.as_slice(), fork_name) + .post_beacon_pool_attestations_v1(attestations.as_slice()) .await .unwrap_err(); diff --git a/consensus/types/src/attestation.rs b/consensus/types/src/attestation.rs index 144ce0e105..a7ede26083 100644 --- a/consensus/types/src/attestation.rs +++ b/consensus/types/src/attestation.rs @@ -413,28 +413,32 @@ impl AttestationElectra { .first() .ok_or(Error::InvalidAggregationBit)?; + println!("committees! {:?}", committees); + let beacon_committee = committees .get(committee_index as usize) .ok_or(Error::InvalidCommitteeIndex)?; - let attester_indices = beacon_committee + println!("agg bit {} committee index {}", aggregation_bit, committee_index); + + + println!("agg bit {}", aggregation_bit); + println!("agg bit {} committees {:?}", aggregation_bit, beacon_committee.committee); + let attester_index = beacon_committee .committee .iter() .enumerate() - .filter_map(|(i, &index)| { + .find_map(|(i, &index)| { + println!("agg bit {} val index {}", aggregation_bit, index); + println!("agg bit {} ith {}", aggregation_bit, i); if aggregation_bit as usize == i { + println!("agg bit {} RETURNED INDEX {}", aggregation_bit, index); return Some(index); } None - }) - .collect::>(); + }); - if attester_indices.len() != 1 { - return Err(Error::InvalidAggregationBit); - }; - - let attester_index = *attester_indices - .first() + let attester_index = attester_index .ok_or(Error::InvalidAggregationBit)?; Ok(SingleAttestation { @@ -674,36 +678,39 @@ impl SingleAttestation { &self, committees: &[BeaconCommittee], ) -> Result, Error> { - let beacon_committee = committees - .get(self.committee_index) - .ok_or(Error::InvalidAggregationBit)?; - let aggregation_bits = beacon_committee - .committee - .iter() - .enumerate() - .filter_map(|(i, &validator_index)| { - if self.attester_index == validator_index { - return Some(i); - } - None - }) - .collect::>(); - if aggregation_bits.len() != 1 { - return Err(Error::InvalidAggregationBit); + let mut committee_offset = 0; + let mut aggregation_bit: Option = None; + for beacon_committee in committees { + if beacon_committee.index == self.committee_index as u64 { + aggregation_bit = beacon_committee + .committee + .iter() + .enumerate() + .find_map(|(i, &validator_index)| { + if self.attester_index == validator_index { + return Some(i + committee_offset); + } + None + }); + committee_offset += beacon_committee.committee.len(); + break; + } else { + committee_offset += beacon_committee.committee.len(); + } } - - let aggregation_bit = aggregation_bits.first().unwrap(); + + let aggregation_bit = aggregation_bit.ok_or(Error::InvalidAggregationBit)?; let mut committee_bits: BitVector = BitVector::default(); committee_bits .set(self.committee_index, true) .map_err(|_| Error::InvalidCommitteeIndex)?; - let mut aggregation_bits = BitList::with_capacity(beacon_committee.committee.len()) + let mut aggregation_bits = BitList::with_capacity(committee_offset) .map_err(|_| Error::InvalidCommitteeLength)?; - aggregation_bits.set(*aggregation_bit, true)?; + aggregation_bits.set(aggregation_bit, true)?; Ok(Attestation::Electra(AttestationElectra { aggregation_bits, diff --git a/consensus/types/src/subnet_id.rs b/consensus/types/src/subnet_id.rs index 187b070d29..54beaa3dff 100644 --- a/consensus/types/src/subnet_id.rs +++ b/consensus/types/src/subnet_id.rs @@ -1,4 +1,5 @@ //! Identifies each shard by an integer identifier. +use crate::attestation::SingleAttestation; use crate::{AttestationRef, ChainSpec, CommitteeIndex, EthSpec, Slot}; use alloy_primitives::{bytes::Buf, U256}; use safe_arith::{ArithError, SafeArith}; @@ -57,6 +58,21 @@ impl SubnetId { ) } + /// Compute the subnet for an attestation where each slot in the + /// attestation epoch contains `committee_count_per_slot` committees. + pub fn compute_subnet_for_single_attestation( + attestation: SingleAttestation, + committee_count_per_slot: u64, + spec: &ChainSpec, + ) -> Result { + Self::compute_subnet::( + attestation.data.slot, + attestation.committee_index as u64, + committee_count_per_slot, + spec, + ) + } + /// Compute the subnet for an attestation with `attestation.data.slot == slot` and /// `attestation.data.index == committee_index` where each slot in the attestation epoch /// contains `committee_count_at_slot` committees. diff --git a/validator_client/slashing_protection/src/attestation_tests.rs b/validator_client/slashing_protection/src/attestation_tests.rs index b577ccd9d8..3f3782950c 100644 --- a/validator_client/slashing_protection/src/attestation_tests.rs +++ b/validator_client/slashing_protection/src/attestation_tests.rs @@ -31,6 +31,7 @@ fn signed_att(attestation: &AttestationData) -> SignedAttestation { SignedAttestation::from_attestation(attestation, DEFAULT_DOMAIN) } + #[test] fn valid_empty_history() { StreamTest {