Added more test coverage, simplified Attestation conversion, and other minor refactors

This commit is contained in:
Eitan Seri-Levi
2025-01-06 16:30:58 +07:00
parent 4700ef9798
commit c7ef72d01e
12 changed files with 181 additions and 188 deletions

View File

@@ -325,14 +325,16 @@ impl<T: BeaconChainTypes> VerifiedUnaggregatedAttestation<'_, T> {
self.indexed_attestation
}
pub fn single_attestation(&self) -> SingleAttestation {
// TODO(single-attestation) unwrap
SingleAttestation {
committee_index: self.attestation.committee_index().unwrap_or(0) as usize,
pub fn single_attestation(&self) -> Option<SingleAttestation> {
let Some(committee_index) = self.attestation.committee_index() else {
return None;
};
Some(SingleAttestation {
committee_index: committee_index as usize,
attester_index: self.validator_index,
data: self.attestation.data().clone(),
signature: self.attestation.signature().clone(),
}
})
}
}

View File

@@ -2038,9 +2038,13 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.spec
.fork_name_at_slot::<T::EthSpec>(v.attestation().data().slot);
if current_fork.electra_enabled() {
event_handler.register(EventKind::SingleAttestation(Box::new(
v.single_attestation(),
)));
// I don't see a situation where this could return None. The upstream unaggregated attestation checks
// should have already verified that this is an attestation with a single committee bit set.
if let Some(single_attestation) = v.single_attestation() {
event_handler.register(EventKind::SingleAttestation(Box::new(
single_attestation,
)));
}
} else {
event_handler.register(EventKind::Attestation(Box::new(
v.attestation().clone_as_attestation(),

View File

@@ -668,8 +668,10 @@ pub type CommitteeAttestations<E> = Vec<(Attestation<E>, SubnetId)>;
pub type HarnessAttestations<E> =
Vec<(CommitteeAttestations<E>, Option<SignedAggregateAndProof<E>>)>;
pub type HarnessSingleAttestations<E> =
Vec<(CommitteeSingleAttestations, Option<SignedAggregateAndProof<E>>)>;
pub type HarnessSingleAttestations<E> = Vec<(
CommitteeSingleAttestations,
Option<SignedAggregateAndProof<E>>,
)>;
pub type HarnessSyncContributions<E> = Vec<(
Vec<(SyncCommitteeMessage, usize)>,
@@ -1028,6 +1030,7 @@ where
)
}
#[allow(clippy::too_many_arguments)]
pub fn produce_single_attestation_for_block(
&self,
slot: Slot,
@@ -1038,7 +1041,6 @@ where
aggregation_bit_index: usize,
validator_index: usize,
) -> Result<SingleAttestation, BeaconChainError> {
let epoch = slot.epoch(E::slots_per_epoch());
if state.slot() > slot {
@@ -1063,7 +1065,7 @@ where
*state.get_block_root(target_slot)?
};
let mut attestation: Attestation<E> = Attestation::empty_for_signing(
let attestation: Attestation<E> = Attestation::empty_for_signing(
index,
committee_len,
slot,
@@ -1078,29 +1080,29 @@ where
let attestation = match attestation {
Attestation::Electra(mut attn) => {
attn.aggregation_bits.set(aggregation_bit_index, true).unwrap();
attn.aggregation_bits
.set(aggregation_bit_index, true)
.unwrap();
attn
},
}
Attestation::Base(_) => panic!("MUST BE AN ELECTRA ATTESTATION"),
};
let committee = state.get_beacon_committee(slot, index)?;
let committee = state.get_beacon_committee(slot, index).unwrap();
// let committees = state.get_beacon_committees_at_epoch(RelativeEpoch::Current)?;
let single_attestation = attestation.to_single_attestation(Some(committee.clone()))?;
let single_attestation = attestation.to_single_attestation(
&vec![committee.clone()]
)?;
let attestation: Attestation<E> = single_attestation.to_attestation(Some(committee))?;
let attestation: Attestation<E> = single_attestation.to_attestation(&vec![committee])?;
assert_eq!(single_attestation.committee_index, attestation.committee_index().unwrap() as usize);
assert_eq!(
single_attestation.committee_index,
attestation.committee_index().unwrap() as usize
);
assert_eq!(single_attestation.attester_index, validator_index);
// assert_eq!(single_attestation.attester_index, attestation.attester_index());
Ok(single_attestation)
}
/// Produces an "unaggregated" attestation for the given `slot` and `index` that attests to
/// `beacon_block_root`. The provided `state` should match the `block.state_root` for the
/// `block` identified by `beacon_block_root`.
@@ -1158,7 +1160,7 @@ where
)?)
}
/// A list of attestations for each committee for the given slot.
/// A list of attestations for each committee for the given slot.
///
/// The first layer of the Vec is organised per committee. For example, if the return value is
/// called `all_attestations`, then all attestations in `all_attestations[0]` will be for
@@ -1255,7 +1257,7 @@ where
Cow::Borrowed(state),
state_root,
i,
*validator_index
*validator_index,
)
.unwrap();
@@ -1485,7 +1487,7 @@ where
)
}
/// A list of attestations for each committee for the given slot.
/// A list of attestations for each committee for the given slot.
///
/// The first layer of the Vec is organised per committee. For example, if the return value is
/// called `all_attestations`, then all attestations in `all_attestations[0]` will be for

View File

@@ -81,12 +81,13 @@ use tokio_stream::{
StreamExt,
};
use types::{
fork_versioned_response::EmptyMetadata, Attestation, AttestationData, AttestationShufflingId,
AttesterSlashing, BeaconStateError, CommitteeCache, ConfigAndPreset, Epoch, EthSpec, ForkName,
ForkVersionedResponse, Hash256, ProposerPreparationData, ProposerSlashing, RelativeEpoch,
SignedAggregateAndProof, SignedBlindedBeaconBlock, SignedBlsToExecutionChange,
SignedContributionAndProof, SignedValidatorRegistrationData, SignedVoluntaryExit, Slot,
SyncCommitteeMessage, SyncContributionData, attestation::SingleAttestation
attestation::SingleAttestation, fork_versioned_response::EmptyMetadata, Attestation,
AttestationData, AttestationShufflingId, AttesterSlashing, BeaconStateError, CommitteeCache,
ConfigAndPreset, Epoch, EthSpec, ForkName, ForkVersionedResponse, Hash256,
ProposerPreparationData, ProposerSlashing, RelativeEpoch, SignedAggregateAndProof,
SignedBlindedBeaconBlock, SignedBlsToExecutionChange, SignedContributionAndProof,
SignedValidatorRegistrationData, SignedVoluntaryExit, Slot, SyncCommitteeMessage,
SyncContributionData,
};
use validator::pubkey_to_validator_index;
use version::{
@@ -1879,9 +1880,6 @@ pub fn serve<T: BeaconChainTypes>(
.and(reprocess_send_filter)
.and(log_filter.clone())
.then(
// V1 and V2 are identical except V2 has a consensus version header in the request.
// We only require this header for SSZ deserialization, which isn't supported for
// this endpoint presently.
|task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
attestations: Vec<SingleAttestation>,

View File

@@ -100,10 +100,14 @@ fn verify_and_publish_attestation<T: BeaconChainTypes>(
attn.data.target.root,
attn.data.slot.epoch(T::EthSpec::slots_per_epoch()),
|committee_cache, _| {
let committees =
committee_cache.get_beacon_committees_at_slot(attn.data.slot)?;
let committee_index = attn
.committee_index()
.ok_or(BeaconChainError::AttestationCommitteeIndexNotSet)?;
let single_attestation = attn.to_single_attestation(&committees)?;
let committee =
committee_cache.get_beacon_committee(attn.data.slot, committee_index);
let single_attestation = attn.to_single_attestation(committee)?;
network_tx
.send(NetworkMessage::Publish {
@@ -166,6 +170,7 @@ pub async fn publish_single_attestations<T: BeaconChainTypes>(
log: Logger,
) -> Result<(), warp::Rejection> {
let mut attestations = vec![];
for single_attestation in single_attestations {
let attestation = chain.with_committee_cache(
single_attestation.data.target.root,
@@ -174,10 +179,12 @@ pub async fn publish_single_attestations<T: BeaconChainTypes>(
.slot
.epoch(T::EthSpec::slots_per_epoch()),
|committee_cache, _| {
let committees =
committee_cache.get_beacon_committees_at_slot(single_attestation.data.slot)?;
let committee = committee_cache.get_beacon_committee(
single_attestation.data.slot,
single_attestation.committee_index as u64,
);
let attestation = single_attestation.to_attestation::<T::EthSpec>(&committees)?;
let attestation = single_attestation.to_attestation::<T::EthSpec>(committee)?;
Ok(attestation)
},

View File

@@ -858,53 +858,6 @@ pub async fn fork_choice_before_proposal() {
assert_eq!(block_d.parent_root(), Hash256::from(block_root_b));
}
// Test that attestations to unknown blocks are requeued and processed when their block arrives.
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn valid_single_attestation() {
let validator_count = 128;
let all_validators = (0..validator_count).collect::<Vec<_>>();
let tester = InteractiveTester::<E>::new(None, validator_count).await;
let harness = &tester.harness;
let client = tester.client.clone();
let num_initial = 5;
// Slot of the block attested to.
let attestation_slot = Slot::new(num_initial) + 1;
// Make some initial blocks.
harness.advance_slot();
harness
.extend_chain(
num_initial as usize,
BlockStrategy::OnCanonicalHead,
AttestationStrategy::AllValidators,
)
.await;
harness.advance_slot();
assert_eq!(harness.get_current_slot(), attestation_slot);
// Make the attested-to block without applying it.
let pre_state = harness.get_current_state();
let (block, post_state) = harness.make_block(pre_state, attestation_slot).await;
let block_root = block.0.canonical_root();
// Make attestations to the block and POST them to the beacon node on a background thread.
let attestations = harness
.make_single_attestations(
&all_validators,
&post_state,
block.0.state_root(),
block_root.into(),
attestation_slot,
)
.into_iter()
.flat_map(|attestations| attestations.into_iter().map(|(att, _subnet)| att))
.collect::<Vec<_>>();
}
// Test that attestations to unknown blocks are requeued and processed when their block arrives.
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn queue_attestations_from_http() {
@@ -937,27 +890,48 @@ async fn queue_attestations_from_http() {
let pre_state = harness.get_current_state();
let (block, post_state) = harness.make_block(pre_state, attestation_slot).await;
let block_root = block.0.canonical_root();
let fork_name = tester.harness.spec.fork_name_at_slot::<E>(attestation_slot);
// Make attestations to the block and POST them to the beacon node on a background thread.
let attestations = harness
.make_unaggregated_attestations(
&all_validators,
&post_state,
block.0.state_root(),
block_root.into(),
attestation_slot,
)
.into_iter()
.flat_map(|attestations| attestations.into_iter().map(|(att, _subnet)| att))
.collect::<Vec<_>>();
let attestation_future = if fork_name.electra_enabled() {
let single_attestations = harness
.make_single_attestations(
&all_validators,
&post_state,
block.0.state_root(),
block_root.into(),
attestation_slot,
)
.into_iter()
.flat_map(|attestations| attestations.into_iter().map(|(att, _subnet)| att))
.collect::<Vec<_>>();
let fork_name = tester.harness.spec.fork_name_at_slot::<E>(attestation_slot);
let attestation_future = tokio::spawn(async move {
client
.post_beacon_pool_attestations_v1(&attestations)
.await
.expect("attestations should be processed successfully")
});
tokio::spawn(async move {
client
.post_beacon_pool_attestations_v2(&single_attestations, fork_name)
.await
.expect("attestations should be processed successfully")
})
} else {
let attestations = harness
.make_unaggregated_attestations(
&all_validators,
&post_state,
block.0.state_root(),
block_root.into(),
attestation_slot,
)
.into_iter()
.flat_map(|attestations| attestations.into_iter().map(|(att, _subnet)| att))
.collect::<Vec<_>>();
tokio::spawn(async move {
client
.post_beacon_pool_attestations_v1(&attestations)
.await
.expect("attestations should be processed successfully")
})
};
// In parallel, apply the block. We need to manually notify the reprocess queue, because the
// `beacon_chain` does not know about the queue and will not update it for us.

View File

@@ -39,8 +39,9 @@ use tokio::time::Duration;
use tree_hash::TreeHash;
use types::application_domain::ApplicationDomain;
use types::{
attestation::AttestationBase, AggregateSignature, BitList, Domain, EthSpec, ExecutionBlockHash,
Hash256, Keypair, MainnetEthSpec, RelativeEpoch, SelectionProof, SignedRoot, Slot,
attestation::AttestationBase, attestation::SingleAttestation, AggregateSignature, BitList,
Domain, EthSpec, ExecutionBlockHash, Hash256, Keypair, MainnetEthSpec, RelativeEpoch,
SelectionProof, SignedRoot, Slot,
};
type E = MainnetEthSpec;
@@ -71,6 +72,7 @@ struct ApiTester {
next_block: PublishBlockRequest<E>,
reorg_block: PublishBlockRequest<E>,
attestations: Vec<Attestation<E>>,
single_attestations: Vec<SingleAttestation>,
contribution_and_proofs: Vec<SignedContributionAndProof<E>>,
attester_slashing: AttesterSlashing<E>,
proposer_slashing: ProposerSlashing,
@@ -203,6 +205,27 @@ impl ApiTester {
"precondition: attestations for testing"
);
let fork_name = harness
.chain
.spec
.fork_name_at_slot::<E>(harness.chain.slot().unwrap());
let single_attestations = if fork_name.electra_enabled() {
harness
.get_single_attestations(
&AttestationStrategy::AllValidators,
&head.beacon_state,
head_state_root,
head.beacon_block_root,
harness.chain.slot().unwrap(),
)
.into_iter()
.flat_map(|vec| vec.into_iter().map(|(attestation, _subnet_id)| attestation))
.collect::<Vec<_>>()
} else {
vec![]
};
let current_epoch = harness
.chain
.slot()
@@ -294,6 +317,7 @@ impl ApiTester {
next_block,
reorg_block,
attestations,
single_attestations,
contribution_and_proofs,
attester_slashing,
proposer_slashing,
@@ -381,6 +405,7 @@ impl ApiTester {
next_block,
reorg_block,
attestations,
single_attestations: vec![],
contribution_and_proofs: vec![],
attester_slashing,
proposer_slashing,
@@ -1801,12 +1826,12 @@ impl ApiTester {
pub async fn test_post_beacon_pool_attestations_valid_v2(mut self) -> Self {
let fork_name = self
.attestations
.single_attestations
.first()
.map(|att| self.chain.spec.fork_name_at_slot::<E>(att.data().slot))
.map(|att| self.chain.spec.fork_name_at_slot::<E>(att.data.slot))
.unwrap();
self.client
.post_beacon_pool_attestations_v1(self.attestations.as_slice())
.post_beacon_pool_attestations_v2(self.single_attestations.as_slice(), fork_name)
.await
.unwrap();
assert!(
@@ -1855,9 +1880,9 @@ impl ApiTester {
}
pub async fn test_post_beacon_pool_attestations_invalid_v2(mut self) -> Self {
let mut attestations = Vec::new();
for attestation in &self.attestations {
for attestation in &self.single_attestations {
let mut invalid_attestation = attestation.clone();
invalid_attestation.data_mut().slot += 1;
invalid_attestation.data.slot += 1;
// add both to ensure we only fail on invalid attestations
attestations.push(attestation.clone());
@@ -1872,7 +1897,7 @@ impl ApiTester {
let err_v2 = self
.client
.post_beacon_pool_attestations_v1(attestations.as_slice())
.post_beacon_pool_attestations_v2(attestations.as_slice(), fork_name)
.await
.unwrap_err();

View File

@@ -103,10 +103,12 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
.slot
.epoch(T::EthSpec::slots_per_epoch()),
|committee_cache, _| {
let committees =
committee_cache.get_beacon_committees_at_slot(single_attestation.data.slot)?;
let committee = committee_cache.get_beacon_committee(
single_attestation.data.slot,
single_attestation.committee_index as u64,
);
let attestation = single_attestation.to_attestation(&committees)?;
let attestation = single_attestation.to_attestation(committee)?;
Ok(self.send_unaggregated_attestation(
message_id.clone(),

View File

@@ -31,12 +31,12 @@ use reqwest_eventsource::{Event, EventSource};
pub use sensitive_url::{SensitiveError, SensitiveUrl};
use serde::{de::DeserializeOwned, Serialize};
use ssz::Encode;
use types::attestation::SingleAttestation;
use std::fmt;
use std::future::Future;
use std::path::PathBuf;
use std::time::Duration;
use store::fork_versioned_response::ExecutionOptimisticFinalizedForkVersionedResponse;
use types::attestation::SingleAttestation;
pub const V1: EndpointVersion = EndpointVersion(1);
pub const V2: EndpointVersion = EndpointVersion(2);
@@ -1325,7 +1325,7 @@ impl BeaconNodeHttpClient {
}
/// `POST v2/beacon/pool/attestations`
pub async fn post_beacon_pool_attestations_v2<E: EthSpec>(
pub async fn post_beacon_pool_attestations_v2(
&self,
attestations: &[SingleAttestation],
fork_name: ForkName,

View File

@@ -25,6 +25,7 @@ pub enum Error {
InvalidCommitteeLength,
InvalidCommitteeIndex,
InvalidAggregationBit,
InvalidCommittee,
}
impl From<ssz_types::Error> for Error {
@@ -233,10 +234,13 @@ impl<E: EthSpec> Attestation<E> {
}
}
pub fn to_single_attestation_with_attester_index(&self, attester_index: usize) -> Result<SingleAttestation, Error> {
pub fn to_single_attestation_with_attester_index(
&self,
attester_index: usize,
) -> Result<SingleAttestation, Error> {
match self {
Self::Base(_) => Err(Error::IncorrectStateVariant),
Self::Electra(attn) => attn.to_single_attestation_with_attester_index(attester_index)
Self::Electra(attn) => attn.to_single_attestation_with_attester_index(attester_index),
}
}
}
@@ -369,35 +373,30 @@ impl<E: EthSpec> AttestationElectra<E> {
pub fn to_single_attestation_with_attester_index(
&self,
attester_index: usize
attester_index: usize,
) -> Result<SingleAttestation, Error> {
let committee_indices = self.get_committee_indices();
if committee_indices.len() != 1 {
return Err(Error::InvalidCommitteeLength);
}
let committee_index = *committee_indices
.first()
.ok_or(Error::InvalidCommitteeIndex)?;
let Some(committee_index) = self.committee_index() else {
return Err(Error::InvalidCommitteeIndex);
};
Ok(SingleAttestation {
committee_index: committee_index as usize,
attester_index,
data: self.data.clone(),
signature: self.signature.clone()
signature: self.signature.clone(),
})
}
pub fn to_single_attestation(
&self,
committees: &[BeaconCommittee],
committee: Option<BeaconCommittee>,
) -> Result<SingleAttestation, Error> {
let committee_indices = self.get_committee_indices();
if committee_indices.len() != 1 {
return Err(Error::InvalidCommitteeLength);
}
let Some(committee) = committee else {
return Err(Error::InvalidCommittee);
};
let Some(committee_index) = self.committee_index() else {
return Err(Error::InvalidCommitteeIndex);
};
let aggregation_bits = self.get_aggregation_bits();
@@ -405,41 +404,22 @@ impl<E: EthSpec> AttestationElectra<E> {
return Err(Error::InvalidAggregationBit);
}
let committee_index = *committee_indices
.first()
.ok_or(Error::InvalidCommitteeIndex)?;
let aggregation_bit = *aggregation_bits
.first()
.ok_or(Error::InvalidAggregationBit)?;
println!("committees! {:?}", committees);
let beacon_committee = committees
.get(committee_index as usize)
.ok_or(Error::InvalidCommitteeIndex)?;
println!("agg bit {} committee index {}", aggregation_bit, committee_index);
println!("agg bit {}", aggregation_bit);
println!("agg bit {} committees {:?}", aggregation_bit, beacon_committee.committee);
let attester_index = beacon_committee
let attester_index = committee
.committee
.iter()
.enumerate()
.find_map(|(i, &index)| {
println!("agg bit {} val index {}", aggregation_bit, index);
println!("agg bit {} ith {}", aggregation_bit, i);
if aggregation_bit as usize == i {
println!("agg bit {} RETURNED INDEX {}", aggregation_bit, index);
return Some(index);
}
None
});
let attester_index = attester_index
.ok_or(Error::InvalidAggregationBit)?;
let attester_index = attester_index.ok_or(Error::InvalidAggregationBit)?;
Ok(SingleAttestation {
committee_index: committee_index as usize,
@@ -647,7 +627,6 @@ pub struct SingleAttestation {
impl SingleAttestation {
/// Produces a `SingleAttestation` with empty signature and empty attester index.
/// ONLY USE IN ELECTRA
pub fn empty_for_signing(
committee_index: usize,
slot: Slot,
@@ -676,38 +655,34 @@ impl SingleAttestation {
pub fn to_attestation<E: EthSpec>(
&self,
committees: &[BeaconCommittee],
committee: Option<BeaconCommittee>,
) -> Result<Attestation<E>, Error> {
let Some(committee) = committee else {
return Err(Error::InvalidCommittee);
};
let mut committee_offset = 0;
let mut aggregation_bit: Option<usize> = None;
for beacon_committee in committees {
if beacon_committee.index == self.committee_index as u64 {
aggregation_bit = beacon_committee
.committee
.iter()
.enumerate()
.find_map(|(i, &validator_index)| {
if self.attester_index == validator_index {
return Some(i + committee_offset);
}
None
});
committee_offset += beacon_committee.committee.len();
break;
} else {
committee_offset += beacon_committee.committee.len();
}
if committee.index != self.committee_index as u64 {
return Err(Error::InvalidCommittee);
}
let aggregation_bit = aggregation_bit.ok_or(Error::InvalidAggregationBit)?;
let aggregation_bit = committee
.committee
.iter()
.enumerate()
.find_map(|(i, &validator_index)| {
if self.attester_index == validator_index {
return Some(i);
}
None
})
.ok_or(Error::InvalidAggregationBit)?;
let mut committee_bits: BitVector<E::MaxCommitteesPerSlot> = BitVector::default();
committee_bits
.set(self.committee_index, true)
.map_err(|_| Error::InvalidCommitteeIndex)?;
let mut aggregation_bits = BitList::with_capacity(committee_offset)
let mut aggregation_bits = BitList::with_capacity(committee.committee.len())
.map_err(|_| Error::InvalidCommitteeLength)?;
aggregation_bits.set(aggregation_bit, true)?;

View File

@@ -31,7 +31,6 @@ fn signed_att(attestation: &AttestationData) -> SignedAttestation {
SignedAttestation::from_attestation(attestation, DEFAULT_DOMAIN)
}
#[test]
fn valid_empty_history() {
StreamTest {

View File

@@ -457,11 +457,16 @@ impl<T: SlotClock + 'static, E: EthSpec> AttestationService<T, E> {
&[validator_metrics::ATTESTATIONS_HTTP_POST],
);
if fork_name.electra_enabled() {
let single_attestations = attestations.iter().zip(validator_indices).filter_map(|(a, i)| {
a.to_single_attestation_with_attester_index(*i as usize).ok()
}).collect::<Vec<_>>();
let single_attestations = attestations
.iter()
.zip(validator_indices)
.filter_map(|(a, i)| {
a.to_single_attestation_with_attester_index(*i as usize)
.ok()
})
.collect::<Vec<_>>();
beacon_node
.post_beacon_pool_attestations_v2::<E>(&single_attestations, fork_name)
.post_beacon_pool_attestations_v2(&single_attestations, fork_name)
.await
} else {
beacon_node