diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index 6cb8f6fe0b..fa51c3e354 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -256,6 +256,7 @@ pub fn prometheus_metrics() -> warp::filters::log::Log( ); // GET validator/aggregate_attestation?attestation_data_root,slot - let get_validator_aggregate_attestation = eth_v1 + let get_validator_aggregate_attestation = any_version .and(warp::path("validator")) .and(warp::path("aggregate_attestation")) .and(warp::path::end()) @@ -3184,11 +3185,24 @@ pub fn serve( .and(task_spawner_filter.clone()) .and(chain_filter.clone()) .then( - |query: api_types::ValidatorAggregateAttestationQuery, + |endpoint_version: EndpointVersion, + query: api_types::ValidatorAggregateAttestationQuery, not_synced_filter: Result<(), Rejection>, task_spawner: TaskSpawner, chain: Arc>| { task_spawner.blocking_json_task(Priority::P0, move || { + if endpoint_version == V2 { + if query.committee_index.is_none() { + return Err(warp_utils::reject::custom_bad_request( + "missing committee index".to_string(), + )); + } + } else if endpoint_version == V1 { + // Do nothing + } else { + return Err(unsupported_version_rejection(endpoint_version)); + } + //TODO(electra) pass the index into the next method. not_synced_filter?; chain .get_aggregated_attestation_by_slot_and_root( diff --git a/beacon_node/http_api/tests/tests.rs b/beacon_node/http_api/tests/tests.rs index 20c22cf12f..5e22a496a6 100644 --- a/beacon_node/http_api/tests/tests.rs +++ b/beacon_node/http_api/tests/tests.rs @@ -3191,30 +3191,52 @@ impl ApiTester { } pub async fn test_get_validator_aggregate_attestation(self) -> Self { - let attestation = self + if self .chain - .head_beacon_block() - .message() - .body() - .attestations() - .next() - .unwrap() - .clone_as_attestation(); + .spec + .fork_name_at_slot::(self.chain.slot().unwrap()) + >= ForkName::Electra + { + for attestation in self.chain.naive_aggregation_pool.read().iter() { + let result = self + .client + .get_validator_aggregate_attestation_v2( + attestation.data().slot, + attestation.data().tree_hash_root(), + attestation.committee_index(), + ) + .await + .unwrap() + .unwrap() + .data; + let expected = attestation; - let result = self - .client - .get_validator_aggregate_attestation( - attestation.data().slot, - attestation.data().tree_hash_root(), - ) - .await - .unwrap() - .unwrap() - .data; + assert_eq!(&result, expected); + } + } else { + let attestation = self + .chain + .head_beacon_block() + .message() + .body() + .attestations() + .next() + .unwrap() + .clone_as_attestation(); + let result = self + .client + .get_validator_aggregate_attestation_v1( + attestation.data().slot, + attestation.data().tree_hash_root(), + ) + .await + .unwrap() + .unwrap() + .data; + let expected = attestation; - let expected = attestation; - - assert_eq!(result, expected); + assert_eq!(result, expected); + } self } diff --git a/common/eth2/src/lib.rs b/common/eth2/src/lib.rs index d8b2c8ef2d..532429df3b 100644 --- a/common/eth2/src/lib.rs +++ b/common/eth2/src/lib.rs @@ -2179,8 +2179,8 @@ impl BeaconNodeHttpClient { self.get_with_timeout(path, self.timeouts.attestation).await } - /// `GET validator/aggregate_attestation?slot,attestation_data_root` - pub async fn get_validator_aggregate_attestation( + /// `GET v1/validator/aggregate_attestation?slot,attestation_data_root` + pub async fn get_validator_aggregate_attestation_v1( &self, slot: Slot, attestation_data_root: Hash256, @@ -2203,6 +2203,32 @@ impl BeaconNodeHttpClient { .await } + /// `GET v2/validator/aggregate_attestation?slot,attestation_data_root,committee_index` + pub async fn get_validator_aggregate_attestation_v2( + &self, + slot: Slot, + attestation_data_root: Hash256, + committee_index: CommitteeIndex, + ) -> Result>>, Error> { + let mut path = self.eth_path(V2)?; + + path.path_segments_mut() + .map_err(|()| Error::InvalidUrl(self.server.clone()))? + .push("validator") + .push("aggregate_attestation"); + + path.query_pairs_mut() + .append_pair("slot", &slot.to_string()) + .append_pair( + "attestation_data_root", + &format!("{:?}", attestation_data_root), + ) + .append_pair("committee_index", &committee_index.to_string()); + + self.get_opt_with_timeout(path, self.timeouts.attestation) + .await + } + /// `GET validator/sync_committee_contribution` pub async fn get_validator_sync_committee_contribution( &self, diff --git a/common/eth2/src/types.rs b/common/eth2/src/types.rs index b15246e7fd..2f6e7fe0e8 100644 --- a/common/eth2/src/types.rs +++ b/common/eth2/src/types.rs @@ -765,6 +765,8 @@ pub struct ValidatorAttestationDataQuery { pub struct ValidatorAggregateAttestationQuery { pub attestation_data_root: Hash256, pub slot: Slot, + #[serde(skip_serializing_if = "Option::is_none")] + pub committee_index: Option, } #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Hash)] diff --git a/validator_client/src/attestation_service.rs b/validator_client/src/attestation_service.rs index 5b7f31867b..062a9436e9 100644 --- a/validator_client/src/attestation_service.rs +++ b/validator_client/src/attestation_service.rs @@ -14,6 +14,7 @@ use std::ops::Deref; use std::sync::Arc; use tokio::time::{sleep, sleep_until, Duration, Instant}; use tree_hash::TreeHash; +use types::ForkName; use types::{ attestation::AttestationBase, AggregateSignature, Attestation, AttestationData, BitList, ChainSpec, CommitteeIndex, EthSpec, Slot, @@ -290,17 +291,21 @@ impl AttestationService { // Then download, sign and publish a `SignedAggregateAndProof` for each // validator that is elected to aggregate for this `slot` and // `committee_index`. - self.produce_and_publish_aggregates(&attestation_data, &validator_duties) - .await - .map_err(move |e| { - crit!( - log, - "Error during attestation routine"; - "error" => format!("{:?}", e), - "committee_index" => committee_index, - "slot" => slot.as_u64(), - ) - })?; + self.produce_and_publish_aggregates( + &attestation_data, + committee_index, + &validator_duties, + ) + .await + .map_err(move |e| { + crit!( + log, + "Error during attestation routine"; + "error" => format!("{:?}", e), + "committee_index" => committee_index, + "slot" => slot.as_u64(), + ) + })?; } Ok(()) @@ -493,6 +498,7 @@ impl AttestationService { async fn produce_and_publish_aggregates( &self, attestation_data: &AttestationData, + committee_index: CommitteeIndex, validator_duties: &[DutyAndProof], ) -> Result<(), String> { let log = self.context.log(); @@ -505,6 +511,12 @@ impl AttestationService { return Ok(()); } + let fork_name = self + .context + .eth2_config + .spec + .fork_name_at_slot::(attestation_data.slot); + let aggregated_attestation = &self .beacon_nodes .first_success( @@ -515,12 +527,24 @@ impl AttestationService { &metrics::ATTESTATION_SERVICE_TIMES, &[metrics::AGGREGATES_HTTP_GET], ); - beacon_node - .get_validator_aggregate_attestation( - attestation_data.slot, - attestation_data.tree_hash_root(), - ) - .await + let aggregate_attestation_result = if fork_name >= ForkName::Electra { + beacon_node + .get_validator_aggregate_attestation_v2( + attestation_data.slot, + attestation_data.tree_hash_root(), + committee_index, + ) + .await + } else { + beacon_node + .get_validator_aggregate_attestation_v1( + attestation_data.slot, + attestation_data.tree_hash_root(), + ) + .await + }; + + aggregate_attestation_result .map_err(|e| { format!("Failed to produce an aggregate attestation: {:?}", e) })?