From f405601d6f898e4693353ef9e72881da5eb27b58 Mon Sep 17 00:00:00 2001 From: realbigsean Date: Mon, 8 Jul 2024 18:19:10 -0700 Subject: [PATCH] add client updates and test updates --- beacon_node/http_api/tests/fork_tests.rs | 13 +- .../http_api/tests/interactive_tests.rs | 3 +- beacon_node/http_api/tests/tests.rs | 136 ++++++++++-- common/eth2/src/lib.rs | 197 ++++++++++++++++-- consensus/types/src/attestation.rs | 25 ++- consensus/types/src/attester_slashing.rs | 23 ++ validator_client/src/attestation_service.rs | 38 +++- 7 files changed, 397 insertions(+), 38 deletions(-) diff --git a/beacon_node/http_api/tests/fork_tests.rs b/beacon_node/http_api/tests/fork_tests.rs index ad32ff1d57..08dd7f06f7 100644 --- a/beacon_node/http_api/tests/fork_tests.rs +++ b/beacon_node/http_api/tests/fork_tests.rs @@ -150,8 +150,13 @@ async fn attestations_across_fork_with_skip_slots() { .collect::>(); assert!(!unaggregated_attestations.is_empty()); + let fork_name = harness.spec.fork_name_at_slot::(fork_slot); client - .post_beacon_pool_attestations(&unaggregated_attestations) + .post_beacon_pool_attestations_v1(&unaggregated_attestations) + .await + .unwrap(); + client + .post_beacon_pool_attestations_v2(&unaggregated_attestations, fork_name) .await .unwrap(); @@ -162,7 +167,11 @@ async fn attestations_across_fork_with_skip_slots() { assert!(!signed_aggregates.is_empty()); client - .post_validator_aggregate_and_proof(&signed_aggregates) + .post_validator_aggregate_and_proof_v1(&signed_aggregates) + .await + .unwrap(); + client + .post_validator_aggregate_and_proof_v2(&signed_aggregates, fork_name) .await .unwrap(); } diff --git a/beacon_node/http_api/tests/interactive_tests.rs b/beacon_node/http_api/tests/interactive_tests.rs index 711820ccac..a2c48b98c9 100644 --- a/beacon_node/http_api/tests/interactive_tests.rs +++ b/beacon_node/http_api/tests/interactive_tests.rs @@ -890,9 +890,10 @@ async fn queue_attestations_from_http() { .flat_map(|attestations| attestations.into_iter().map(|(att, _subnet)| att)) .collect::>(); + let fork_name = tester.harness.spec.fork_name_at_slot::(attestation_slot); let attestation_future = tokio::spawn(async move { client - .post_beacon_pool_attestations(&attestations) + .post_beacon_pool_attestations_v2(&attestations, fork_name) .await .expect("attestations should be processed successfully") }); diff --git a/beacon_node/http_api/tests/tests.rs b/beacon_node/http_api/tests/tests.rs index 5baf96091b..6068b4a093 100644 --- a/beacon_node/http_api/tests/tests.rs +++ b/beacon_node/http_api/tests/tests.rs @@ -1633,7 +1633,7 @@ impl ApiTester { for block_id in self.interesting_block_ids() { let result = self .client - .get_beacon_blocks_attestations(block_id.0) + .get_beacon_blocks_attestations_v2(block_id.0) .await .unwrap() .map(|res| res.data); @@ -1666,7 +1666,7 @@ impl ApiTester { pub async fn test_post_beacon_pool_attestations_valid(mut self) -> Self { self.client - .post_beacon_pool_attestations(self.attestations.as_slice()) + .post_beacon_pool_attestations_v1(self.attestations.as_slice()) .await .unwrap(); @@ -1675,6 +1675,20 @@ impl ApiTester { "valid attestation should be sent to network" ); + let fork_name = self + .attestations + .first() + .map(|att| self.chain.spec.fork_name_at_slot::(att.data().slot)) + .unwrap(); + self.client + .post_beacon_pool_attestations_v2(self.attestations.as_slice(), fork_name) + .await + .unwrap(); + assert!( + self.network_rx.network_recv.recv().await.is_some(), + "valid attestation should be sent to network" + ); + self } @@ -1691,7 +1705,7 @@ impl ApiTester { let err = self .client - .post_beacon_pool_attestations(attestations.as_slice()) + .post_beacon_pool_attestations_v1(attestations.as_slice()) .await .unwrap_err(); @@ -1712,6 +1726,35 @@ impl ApiTester { "if some attestations are valid, we should send them to the network" ); + let fork_name = self + .attestations + .first() + .map(|att| self.chain.spec.fork_name_at_slot::(att.data().slot)) + .unwrap(); + + let err_v2 = self + .client + .post_beacon_pool_attestations_v2(attestations.as_slice(), fork_name) + .await + .unwrap_err(); + + match err_v2 { + Error::ServerIndexedMessage(IndexedErrorMessage { + code, + message: _, + failures, + }) => { + assert_eq!(code, 400); + assert_eq!(failures.len(), self.attestations.len()); + } + _ => panic!("query did not fail correctly"), + } + + assert!( + self.network_rx.network_recv.recv().await.is_some(), + "if some attestations are valid, we should send them to the network" + ); + self } @@ -1777,7 +1820,7 @@ impl ApiTester { pub async fn test_get_beacon_pool_attestations(self) -> Self { let result = self .client - .get_beacon_pool_attestations(None, None) + .get_beacon_pool_attestations_v1(None, None) .await .unwrap() .data; @@ -1787,12 +1830,34 @@ impl ApiTester { assert_eq!(result, expected); + let result = self + .client + .get_beacon_pool_attestations_v2(None, None) + .await + .unwrap() + .data; + assert_eq!(result, expected); + self } pub async fn test_post_beacon_pool_attester_slashings_valid(mut self) -> Self { self.client - .post_beacon_pool_attester_slashings(&self.attester_slashing) + .post_beacon_pool_attester_slashings_v1(&self.attester_slashing) + .await + .unwrap(); + + assert!( + self.network_rx.network_recv.recv().await.is_some(), + "valid attester slashing should be sent to network" + ); + + let fork_name = self + .chain + .spec + .fork_name_at_slot::(self.attester_slashing.attestation_1().data().slot); + self.client + .post_beacon_pool_attester_slashings_v2(&self.attester_slashing, fork_name) .await .unwrap(); @@ -1816,7 +1881,21 @@ impl ApiTester { } self.client - .post_beacon_pool_attester_slashings(&slashing) + .post_beacon_pool_attester_slashings_v1(&slashing) + .await + .unwrap_err(); + + assert!( + self.network_rx.network_recv.recv().now_or_never().is_none(), + "invalid attester slashing should not be sent to network" + ); + + let fork_name = self + .chain + .spec + .fork_name_at_slot::(self.attester_slashing.attestation_1().data().slot); + self.client + .post_beacon_pool_attester_slashings_v2(&slashing, fork_name) .await .unwrap_err(); @@ -1831,7 +1910,7 @@ impl ApiTester { pub async fn test_get_beacon_pool_attester_slashings(self) -> Self { let result = self .client - .get_beacon_pool_attester_slashings() + .get_beacon_pool_attester_slashings_v1() .await .unwrap() .data; @@ -1840,6 +1919,14 @@ impl ApiTester { assert_eq!(result, expected); + let result = self + .client + .get_beacon_pool_attester_slashings_v2() + .await + .unwrap() + .data; + assert_eq!(result, expected); + self } @@ -3210,7 +3297,7 @@ impl ApiTester { .get_validator_aggregate_attestation_v2( attestation.data().slot, attestation.data().tree_hash_root(), - attestation.committee_index(), + attestation.committee_index().unwrap(), ) .await .unwrap() @@ -3346,7 +3433,18 @@ impl ApiTester { let aggregate = self.get_aggregate().await; self.client - .post_validator_aggregate_and_proof::(&[aggregate]) + .post_validator_aggregate_and_proof_v1::(&[aggregate.clone()]) + .await + .unwrap(); + + assert!(self.network_rx.network_recv.recv().await.is_some()); + + let fork_name = self + .chain + .spec + .fork_name_at_slot::(aggregate.message().aggregate().data().slot); + self.client + .post_validator_aggregate_and_proof_v2::(&[aggregate], fork_name) .await .unwrap(); @@ -3367,12 +3465,22 @@ impl ApiTester { } self.client - .post_validator_aggregate_and_proof::(&[aggregate]) + .post_validator_aggregate_and_proof_v1::(&[aggregate.clone()]) .await .unwrap_err(); assert!(self.network_rx.network_recv.recv().now_or_never().is_none()); + let fork_name = self + .chain + .spec + .fork_name_at_slot::(aggregate.message().aggregate().data().slot); + self.client + .post_validator_aggregate_and_proof_v2::(&[aggregate], fork_name) + .await + .unwrap(); + assert!(self.network_rx.network_recv.recv().now_or_never().is_none()); + self } @@ -3471,7 +3579,7 @@ impl ApiTester { pub async fn test_post_validator_register_validator_slashed(self) -> Self { // slash a validator self.client - .post_beacon_pool_attester_slashings(&self.attester_slashing) + .post_beacon_pool_attester_slashings_v1(&self.attester_slashing) .await .unwrap(); @@ -3584,7 +3692,7 @@ impl ApiTester { // Attest to the current slot self.client - .post_beacon_pool_attestations(self.attestations.as_slice()) + .post_beacon_pool_attestations_v1(self.attestations.as_slice()) .await .unwrap(); @@ -5224,7 +5332,7 @@ impl ApiTester { // Attest to the current slot self.client - .post_beacon_pool_attestations(self.attestations.as_slice()) + .post_beacon_pool_attestations_v1(self.attestations.as_slice()) .await .unwrap(); @@ -5279,7 +5387,7 @@ impl ApiTester { let expected_attestation_len = self.attestations.len(); self.client - .post_beacon_pool_attestations(self.attestations.as_slice()) + .post_beacon_pool_attestations_v1(self.attestations.as_slice()) .await .unwrap(); diff --git a/common/eth2/src/lib.rs b/common/eth2/src/lib.rs index 532429df3b..ff85d6e64f 100644 --- a/common/eth2/src/lib.rs +++ b/common/eth2/src/lib.rs @@ -346,6 +346,19 @@ impl BeaconNodeHttpClient { Ok(()) } + /// Perform a HTTP POST request with a custom timeout. + async fn post_with_timeout_and_consensus_header( + &self, + url: U, + body: &T, + timeout: Duration, + fork_name: ForkName, + ) -> Result<(), Error> { + self.post_generic_with_consensus_version(url, body, Some(timeout), fork_name) + .await?; + Ok(()) + } + /// Perform a HTTP POST request with a custom timeout, returning a JSON response. async fn post_with_timeout_and_response( &self, @@ -395,6 +408,33 @@ impl BeaconNodeHttpClient { ok_or_error(response).await } + /// Generic POST function supporting arbitrary responses and timeouts. + /// Does not include Content-Type application/json in the request header. + async fn post_generic_json_without_content_type_header_but_with_consensus_header< + T: Serialize, + U: IntoUrl, + >( + &self, + url: U, + body: &T, + timeout: Option, + fork: ForkName, + ) -> Result { + let mut builder = self.client.post(url); + if let Some(timeout) = timeout { + builder = builder.timeout(timeout); + } + + let serialized_body = serde_json::to_vec(body).map_err(Error::InvalidJson)?; + + let response = builder + .header(CONSENSUS_VERSION_HEADER, fork.to_string()) + .body(serialized_body) + .send() + .await?; + ok_or_error(response).await + } + /// Generic POST function supporting arbitrary responses and timeouts. async fn post_generic_with_consensus_version( &self, @@ -1191,10 +1231,10 @@ impl BeaconNodeHttpClient { self.get_opt(path).await } - /// `GET beacon/blocks/{block_id}/attestations` + /// `GET v1/beacon/blocks/{block_id}/attestations` /// /// Returns `Ok(None)` on a 404 error. - pub async fn get_beacon_blocks_attestations( + pub async fn get_beacon_blocks_attestations_v1( &self, block_id: BlockId, ) -> Result>>>, Error> { @@ -1210,8 +1250,28 @@ impl BeaconNodeHttpClient { self.get_opt(path).await } - /// `POST beacon/pool/attestations` - pub async fn post_beacon_pool_attestations( + /// `GET v2/beacon/blocks/{block_id}/attestations` + /// + /// Returns `Ok(None)` on a 404 error. + pub async fn get_beacon_blocks_attestations_v2( + &self, + block_id: BlockId, + ) -> Result>>>, Error> + { + let mut path = self.eth_path(V2)?; + + path.path_segments_mut() + .map_err(|()| Error::InvalidUrl(self.server.clone()))? + .push("beacon") + .push("blocks") + .push(&block_id.to_string()) + .push("attestations"); + + self.get_opt(path).await + } + + /// `POST v1/beacon/pool/attestations` + pub async fn post_beacon_pool_attestations_v1( &self, attestations: &[Attestation], ) -> Result<(), Error> { @@ -1229,8 +1289,33 @@ impl BeaconNodeHttpClient { Ok(()) } - /// `GET beacon/pool/attestations?slot,committee_index` - pub async fn get_beacon_pool_attestations( + /// `POST v2/beacon/pool/attestations` + pub async fn post_beacon_pool_attestations_v2( + &self, + attestations: &[Attestation], + fork_name: ForkName, + ) -> Result<(), Error> { + let mut path = self.eth_path(V2)?; + + path.path_segments_mut() + .map_err(|()| Error::InvalidUrl(self.server.clone()))? + .push("beacon") + .push("pool") + .push("attestations"); + + self.post_with_timeout_and_consensus_header( + path, + &attestations, + self.timeouts.attestation, + fork_name, + ) + .await?; + + Ok(()) + } + + /// `GET v1/beacon/pool/attestations?slot,committee_index` + pub async fn get_beacon_pool_attestations_v1( &self, slot: Option, committee_index: Option, @@ -1256,8 +1341,35 @@ impl BeaconNodeHttpClient { self.get(path).await } - /// `POST beacon/pool/attester_slashings` - pub async fn post_beacon_pool_attester_slashings( + /// `GET v2/beacon/pool/attestations?slot,committee_index` + pub async fn get_beacon_pool_attestations_v2( + &self, + slot: Option, + committee_index: Option, + ) -> Result>>, Error> { + let mut path = self.eth_path(V2)?; + + path.path_segments_mut() + .map_err(|()| Error::InvalidUrl(self.server.clone()))? + .push("beacon") + .push("pool") + .push("attestations"); + + if let Some(slot) = slot { + path.query_pairs_mut() + .append_pair("slot", &slot.to_string()); + } + + if let Some(index) = committee_index { + path.query_pairs_mut() + .append_pair("committee_index", &index.to_string()); + } + + self.get(path).await + } + + /// `POST v1/beacon/pool/attester_slashings` + pub async fn post_beacon_pool_attester_slashings_v1( &self, slashing: &AttesterSlashing, ) -> Result<(), Error> { @@ -1275,8 +1387,30 @@ impl BeaconNodeHttpClient { Ok(()) } - /// `GET beacon/pool/attester_slashings` - pub async fn get_beacon_pool_attester_slashings( + /// `POST v2/beacon/pool/attester_slashings` + pub async fn post_beacon_pool_attester_slashings_v2( + &self, + slashing: &AttesterSlashing, + fork_name: ForkName, + ) -> Result<(), Error> { + let mut path = self.eth_path(V2)?; + + path.path_segments_mut() + .map_err(|()| Error::InvalidUrl(self.server.clone()))? + .push("beacon") + .push("pool") + .push("attester_slashings"); + + self.post_generic_json_without_content_type_header_but_with_consensus_header( + path, slashing, None, fork_name, + ) + .await?; + + Ok(()) + } + + /// `GET v1/beacon/pool/attester_slashings` + pub async fn get_beacon_pool_attester_slashings_v1( &self, ) -> Result>>, Error> { let mut path = self.eth_path(V1)?; @@ -1290,6 +1424,21 @@ impl BeaconNodeHttpClient { self.get(path).await } + /// `GET v2/beacon/pool/attester_slashings` + pub async fn get_beacon_pool_attester_slashings_v2( + &self, + ) -> Result>>, Error> { + let mut path = self.eth_path(V2)?; + + path.path_segments_mut() + .map_err(|()| Error::InvalidUrl(self.server.clone()))? + .push("beacon") + .push("pool") + .push("attester_slashings"); + + self.get(path).await + } + /// `POST beacon/pool/proposer_slashings` pub async fn post_beacon_pool_proposer_slashings( &self, @@ -2324,8 +2473,8 @@ impl BeaconNodeHttpClient { .await } - /// `POST validator/aggregate_and_proofs` - pub async fn post_validator_aggregate_and_proof( + /// `POST v1/validator/aggregate_and_proofs` + pub async fn post_validator_aggregate_and_proof_v1( &self, aggregates: &[SignedAggregateAndProof], ) -> Result<(), Error> { @@ -2342,6 +2491,30 @@ impl BeaconNodeHttpClient { Ok(()) } + /// `POST v2/validator/aggregate_and_proofs` + pub async fn post_validator_aggregate_and_proof_v2( + &self, + aggregates: &[SignedAggregateAndProof], + fork_name: ForkName, + ) -> Result<(), Error> { + let mut path = self.eth_path(V2)?; + + path.path_segments_mut() + .map_err(|()| Error::InvalidUrl(self.server.clone()))? + .push("validator") + .push("aggregate_and_proofs"); + + self.post_with_timeout_and_consensus_header( + path, + &aggregates, + self.timeouts.attestation, + fork_name, + ) + .await?; + + Ok(()) + } + /// `POST validator/beacon_committee_subscriptions` pub async fn post_validator_beacon_committee_subscriptions( &self, diff --git a/consensus/types/src/attestation.rs b/consensus/types/src/attestation.rs index 88993267a9..1acaee9654 100644 --- a/consensus/types/src/attestation.rs +++ b/consensus/types/src/attestation.rs @@ -1,6 +1,6 @@ use crate::slot_data::SlotData; -use crate::Checkpoint; use crate::{test_utils::TestRandom, Hash256, Slot}; +use crate::{Checkpoint, ForkVersionDeserialize}; use derivative::Derivative; use safe_arith::ArithError; use serde::{Deserialize, Serialize}; @@ -487,6 +487,29 @@ impl<'a, E: EthSpec> From> for AttestationRef<'a, E> } } +impl ForkVersionDeserialize for Vec> { + fn deserialize_by_fork<'de, D: serde::Deserializer<'de>>( + value: serde_json::Value, + fork_name: crate::ForkName, + ) -> Result { + if fork_name.electra_enabled() { + let attestations: Vec> = + serde_json::from_value(value).map_err(serde::de::Error::custom)?; + Ok(attestations + .into_iter() + .map(Attestation::Electra) + .collect::>()) + } else { + let attestations: Vec> = + serde_json::from_value(value).map_err(serde::de::Error::custom)?; + Ok(attestations + .into_iter() + .map(Attestation::Base) + .collect::>()) + } + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/consensus/types/src/attester_slashing.rs b/consensus/types/src/attester_slashing.rs index a8d4e6989c..feb10506ef 100644 --- a/consensus/types/src/attester_slashing.rs +++ b/consensus/types/src/attester_slashing.rs @@ -160,6 +160,29 @@ impl AttesterSlashing { } } +impl crate::ForkVersionDeserialize for Vec> { + fn deserialize_by_fork<'de, D: serde::Deserializer<'de>>( + value: serde_json::Value, + fork_name: crate::ForkName, + ) -> Result { + if fork_name.electra_enabled() { + let slashings: Vec> = + serde_json::from_value(value).map_err(serde::de::Error::custom)?; + Ok(slashings + .into_iter() + .map(AttesterSlashing::Electra) + .collect::>()) + } else { + let slashings: Vec> = + serde_json::from_value(value).map_err(serde::de::Error::custom)?; + Ok(slashings + .into_iter() + .map(AttesterSlashing::Base) + .collect::>()) + } + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/validator_client/src/attestation_service.rs b/validator_client/src/attestation_service.rs index db32dbcf4a..5167f8b649 100644 --- a/validator_client/src/attestation_service.rs +++ b/validator_client/src/attestation_service.rs @@ -14,7 +14,7 @@ use std::ops::Deref; use std::sync::Arc; use tokio::time::{sleep, sleep_until, Duration, Instant}; use tree_hash::TreeHash; -use types::{Attestation, AttestationData, ChainSpec, CommitteeIndex, EthSpec, ForkName, Slot}; +use types::{Attestation, AttestationData, ChainSpec, CommitteeIndex, EthSpec, Slot}; /// Builds an `AttestationService`. pub struct AttestationServiceBuilder { @@ -449,6 +449,11 @@ impl AttestationService { warn!(log, "No attestations were published"); return Ok(None); } + let fork_name = self + .context + .eth2_config + .spec + .fork_name_at_slot::(attestation_data.slot); // Post the attestations to the BN. match self @@ -462,9 +467,15 @@ impl AttestationService { &metrics::ATTESTATION_SERVICE_TIMES, &[metrics::ATTESTATIONS_HTTP_POST], ); - beacon_node - .post_beacon_pool_attestations(attestations) - .await + if fork_name.electra_enabled() { + beacon_node + .post_beacon_pool_attestations_v2(attestations, fork_name) + .await + } else { + beacon_node + .post_beacon_pool_attestations_v1(attestations) + .await + } }, ) .await @@ -537,7 +548,7 @@ impl AttestationService { &metrics::ATTESTATION_SERVICE_TIMES, &[metrics::AGGREGATES_HTTP_GET], ); - let aggregate_attestation_result = if fork_name >= ForkName::Electra { + let aggregate_attestation_result = if fork_name.electra_enabled() { beacon_node .get_validator_aggregate_attestation_v2( attestation_data.slot, @@ -627,9 +638,20 @@ impl AttestationService { &metrics::ATTESTATION_SERVICE_TIMES, &[metrics::AGGREGATES_HTTP_POST], ); - beacon_node - .post_validator_aggregate_and_proof(signed_aggregate_and_proofs_slice) - .await + if fork_name.electra_enabled() { + beacon_node + .post_validator_aggregate_and_proof_v2( + signed_aggregate_and_proofs_slice, + fork_name, + ) + .await + } else { + beacon_node + .post_validator_aggregate_and_proof_v1( + signed_aggregate_and_proofs_slice, + ) + .await + } }, ) .await