add client updates and test updates

This commit is contained in:
realbigsean
2024-07-08 18:19:10 -07:00
parent 39d41ada93
commit f405601d6f
7 changed files with 397 additions and 38 deletions

View File

@@ -150,8 +150,13 @@ async fn attestations_across_fork_with_skip_slots() {
.collect::<Vec<_>>();
assert!(!unaggregated_attestations.is_empty());
let fork_name = harness.spec.fork_name_at_slot::<E>(fork_slot);
client
.post_beacon_pool_attestations(&unaggregated_attestations)
.post_beacon_pool_attestations_v1(&unaggregated_attestations)
.await
.unwrap();
client
.post_beacon_pool_attestations_v2(&unaggregated_attestations, fork_name)
.await
.unwrap();
@@ -162,7 +167,11 @@ async fn attestations_across_fork_with_skip_slots() {
assert!(!signed_aggregates.is_empty());
client
.post_validator_aggregate_and_proof(&signed_aggregates)
.post_validator_aggregate_and_proof_v1(&signed_aggregates)
.await
.unwrap();
client
.post_validator_aggregate_and_proof_v2(&signed_aggregates, fork_name)
.await
.unwrap();
}

View File

@@ -890,9 +890,10 @@ async fn queue_attestations_from_http() {
.flat_map(|attestations| attestations.into_iter().map(|(att, _subnet)| att))
.collect::<Vec<_>>();
let fork_name = tester.harness.spec.fork_name_at_slot::<E>(attestation_slot);
let attestation_future = tokio::spawn(async move {
client
.post_beacon_pool_attestations(&attestations)
.post_beacon_pool_attestations_v2(&attestations, fork_name)
.await
.expect("attestations should be processed successfully")
});

View File

@@ -1633,7 +1633,7 @@ impl ApiTester {
for block_id in self.interesting_block_ids() {
let result = self
.client
.get_beacon_blocks_attestations(block_id.0)
.get_beacon_blocks_attestations_v2(block_id.0)
.await
.unwrap()
.map(|res| res.data);
@@ -1666,7 +1666,7 @@ impl ApiTester {
pub async fn test_post_beacon_pool_attestations_valid(mut self) -> Self {
self.client
.post_beacon_pool_attestations(self.attestations.as_slice())
.post_beacon_pool_attestations_v1(self.attestations.as_slice())
.await
.unwrap();
@@ -1675,6 +1675,20 @@ impl ApiTester {
"valid attestation should be sent to network"
);
let fork_name = self
.attestations
.first()
.map(|att| self.chain.spec.fork_name_at_slot::<E>(att.data().slot))
.unwrap();
self.client
.post_beacon_pool_attestations_v2(self.attestations.as_slice(), fork_name)
.await
.unwrap();
assert!(
self.network_rx.network_recv.recv().await.is_some(),
"valid attestation should be sent to network"
);
self
}
@@ -1691,7 +1705,7 @@ impl ApiTester {
let err = self
.client
.post_beacon_pool_attestations(attestations.as_slice())
.post_beacon_pool_attestations_v1(attestations.as_slice())
.await
.unwrap_err();
@@ -1712,6 +1726,35 @@ impl ApiTester {
"if some attestations are valid, we should send them to the network"
);
let fork_name = self
.attestations
.first()
.map(|att| self.chain.spec.fork_name_at_slot::<E>(att.data().slot))
.unwrap();
let err_v2 = self
.client
.post_beacon_pool_attestations_v2(attestations.as_slice(), fork_name)
.await
.unwrap_err();
match err_v2 {
Error::ServerIndexedMessage(IndexedErrorMessage {
code,
message: _,
failures,
}) => {
assert_eq!(code, 400);
assert_eq!(failures.len(), self.attestations.len());
}
_ => panic!("query did not fail correctly"),
}
assert!(
self.network_rx.network_recv.recv().await.is_some(),
"if some attestations are valid, we should send them to the network"
);
self
}
@@ -1777,7 +1820,7 @@ impl ApiTester {
pub async fn test_get_beacon_pool_attestations(self) -> Self {
let result = self
.client
.get_beacon_pool_attestations(None, None)
.get_beacon_pool_attestations_v1(None, None)
.await
.unwrap()
.data;
@@ -1787,12 +1830,34 @@ impl ApiTester {
assert_eq!(result, expected);
let result = self
.client
.get_beacon_pool_attestations_v2(None, None)
.await
.unwrap()
.data;
assert_eq!(result, expected);
self
}
pub async fn test_post_beacon_pool_attester_slashings_valid(mut self) -> Self {
self.client
.post_beacon_pool_attester_slashings(&self.attester_slashing)
.post_beacon_pool_attester_slashings_v1(&self.attester_slashing)
.await
.unwrap();
assert!(
self.network_rx.network_recv.recv().await.is_some(),
"valid attester slashing should be sent to network"
);
let fork_name = self
.chain
.spec
.fork_name_at_slot::<E>(self.attester_slashing.attestation_1().data().slot);
self.client
.post_beacon_pool_attester_slashings_v2(&self.attester_slashing, fork_name)
.await
.unwrap();
@@ -1816,7 +1881,21 @@ impl ApiTester {
}
self.client
.post_beacon_pool_attester_slashings(&slashing)
.post_beacon_pool_attester_slashings_v1(&slashing)
.await
.unwrap_err();
assert!(
self.network_rx.network_recv.recv().now_or_never().is_none(),
"invalid attester slashing should not be sent to network"
);
let fork_name = self
.chain
.spec
.fork_name_at_slot::<E>(self.attester_slashing.attestation_1().data().slot);
self.client
.post_beacon_pool_attester_slashings_v2(&slashing, fork_name)
.await
.unwrap_err();
@@ -1831,7 +1910,7 @@ impl ApiTester {
pub async fn test_get_beacon_pool_attester_slashings(self) -> Self {
let result = self
.client
.get_beacon_pool_attester_slashings()
.get_beacon_pool_attester_slashings_v1()
.await
.unwrap()
.data;
@@ -1840,6 +1919,14 @@ impl ApiTester {
assert_eq!(result, expected);
let result = self
.client
.get_beacon_pool_attester_slashings_v2()
.await
.unwrap()
.data;
assert_eq!(result, expected);
self
}
@@ -3210,7 +3297,7 @@ impl ApiTester {
.get_validator_aggregate_attestation_v2(
attestation.data().slot,
attestation.data().tree_hash_root(),
attestation.committee_index(),
attestation.committee_index().unwrap(),
)
.await
.unwrap()
@@ -3346,7 +3433,18 @@ impl ApiTester {
let aggregate = self.get_aggregate().await;
self.client
.post_validator_aggregate_and_proof::<E>(&[aggregate])
.post_validator_aggregate_and_proof_v1::<E>(&[aggregate.clone()])
.await
.unwrap();
assert!(self.network_rx.network_recv.recv().await.is_some());
let fork_name = self
.chain
.spec
.fork_name_at_slot::<E>(aggregate.message().aggregate().data().slot);
self.client
.post_validator_aggregate_and_proof_v2::<E>(&[aggregate], fork_name)
.await
.unwrap();
@@ -3367,12 +3465,22 @@ impl ApiTester {
}
self.client
.post_validator_aggregate_and_proof::<E>(&[aggregate])
.post_validator_aggregate_and_proof_v1::<E>(&[aggregate.clone()])
.await
.unwrap_err();
assert!(self.network_rx.network_recv.recv().now_or_never().is_none());
let fork_name = self
.chain
.spec
.fork_name_at_slot::<E>(aggregate.message().aggregate().data().slot);
self.client
.post_validator_aggregate_and_proof_v2::<E>(&[aggregate], fork_name)
.await
.unwrap();
assert!(self.network_rx.network_recv.recv().now_or_never().is_none());
self
}
@@ -3471,7 +3579,7 @@ impl ApiTester {
pub async fn test_post_validator_register_validator_slashed(self) -> Self {
// slash a validator
self.client
.post_beacon_pool_attester_slashings(&self.attester_slashing)
.post_beacon_pool_attester_slashings_v1(&self.attester_slashing)
.await
.unwrap();
@@ -3584,7 +3692,7 @@ impl ApiTester {
// Attest to the current slot
self.client
.post_beacon_pool_attestations(self.attestations.as_slice())
.post_beacon_pool_attestations_v1(self.attestations.as_slice())
.await
.unwrap();
@@ -5224,7 +5332,7 @@ impl ApiTester {
// Attest to the current slot
self.client
.post_beacon_pool_attestations(self.attestations.as_slice())
.post_beacon_pool_attestations_v1(self.attestations.as_slice())
.await
.unwrap();
@@ -5279,7 +5387,7 @@ impl ApiTester {
let expected_attestation_len = self.attestations.len();
self.client
.post_beacon_pool_attestations(self.attestations.as_slice())
.post_beacon_pool_attestations_v1(self.attestations.as_slice())
.await
.unwrap();

View File

@@ -346,6 +346,19 @@ impl BeaconNodeHttpClient {
Ok(())
}
/// Perform a HTTP POST request with a custom timeout.
async fn post_with_timeout_and_consensus_header<T: Serialize, U: IntoUrl>(
&self,
url: U,
body: &T,
timeout: Duration,
fork_name: ForkName,
) -> Result<(), Error> {
self.post_generic_with_consensus_version(url, body, Some(timeout), fork_name)
.await?;
Ok(())
}
/// Perform a HTTP POST request with a custom timeout, returning a JSON response.
async fn post_with_timeout_and_response<T: DeserializeOwned, U: IntoUrl, V: Serialize>(
&self,
@@ -395,6 +408,33 @@ impl BeaconNodeHttpClient {
ok_or_error(response).await
}
/// Generic POST function supporting arbitrary responses and timeouts.
/// Does not include Content-Type application/json in the request header.
async fn post_generic_json_without_content_type_header_but_with_consensus_header<
T: Serialize,
U: IntoUrl,
>(
&self,
url: U,
body: &T,
timeout: Option<Duration>,
fork: ForkName,
) -> Result<Response, Error> {
let mut builder = self.client.post(url);
if let Some(timeout) = timeout {
builder = builder.timeout(timeout);
}
let serialized_body = serde_json::to_vec(body).map_err(Error::InvalidJson)?;
let response = builder
.header(CONSENSUS_VERSION_HEADER, fork.to_string())
.body(serialized_body)
.send()
.await?;
ok_or_error(response).await
}
/// Generic POST function supporting arbitrary responses and timeouts.
async fn post_generic_with_consensus_version<T: Serialize, U: IntoUrl>(
&self,
@@ -1191,10 +1231,10 @@ impl BeaconNodeHttpClient {
self.get_opt(path).await
}
/// `GET beacon/blocks/{block_id}/attestations`
/// `GET v1/beacon/blocks/{block_id}/attestations`
///
/// Returns `Ok(None)` on a 404 error.
pub async fn get_beacon_blocks_attestations<E: EthSpec>(
pub async fn get_beacon_blocks_attestations_v1<E: EthSpec>(
&self,
block_id: BlockId,
) -> Result<Option<ExecutionOptimisticFinalizedResponse<Vec<Attestation<E>>>>, Error> {
@@ -1210,8 +1250,28 @@ impl BeaconNodeHttpClient {
self.get_opt(path).await
}
/// `POST beacon/pool/attestations`
pub async fn post_beacon_pool_attestations<E: EthSpec>(
/// `GET v2/beacon/blocks/{block_id}/attestations`
///
/// Returns `Ok(None)` on a 404 error.
pub async fn get_beacon_blocks_attestations_v2<E: EthSpec>(
&self,
block_id: BlockId,
) -> Result<Option<ExecutionOptimisticFinalizedForkVersionedResponse<Vec<Attestation<E>>>>, Error>
{
let mut path = self.eth_path(V2)?;
path.path_segments_mut()
.map_err(|()| Error::InvalidUrl(self.server.clone()))?
.push("beacon")
.push("blocks")
.push(&block_id.to_string())
.push("attestations");
self.get_opt(path).await
}
/// `POST v1/beacon/pool/attestations`
pub async fn post_beacon_pool_attestations_v1<E: EthSpec>(
&self,
attestations: &[Attestation<E>],
) -> Result<(), Error> {
@@ -1229,8 +1289,33 @@ impl BeaconNodeHttpClient {
Ok(())
}
/// `GET beacon/pool/attestations?slot,committee_index`
pub async fn get_beacon_pool_attestations<E: EthSpec>(
/// `POST v2/beacon/pool/attestations`
pub async fn post_beacon_pool_attestations_v2<E: EthSpec>(
&self,
attestations: &[Attestation<E>],
fork_name: ForkName,
) -> Result<(), Error> {
let mut path = self.eth_path(V2)?;
path.path_segments_mut()
.map_err(|()| Error::InvalidUrl(self.server.clone()))?
.push("beacon")
.push("pool")
.push("attestations");
self.post_with_timeout_and_consensus_header(
path,
&attestations,
self.timeouts.attestation,
fork_name,
)
.await?;
Ok(())
}
/// `GET v1/beacon/pool/attestations?slot,committee_index`
pub async fn get_beacon_pool_attestations_v1<E: EthSpec>(
&self,
slot: Option<Slot>,
committee_index: Option<u64>,
@@ -1256,8 +1341,35 @@ impl BeaconNodeHttpClient {
self.get(path).await
}
/// `POST beacon/pool/attester_slashings`
pub async fn post_beacon_pool_attester_slashings<E: EthSpec>(
/// `GET v2/beacon/pool/attestations?slot,committee_index`
pub async fn get_beacon_pool_attestations_v2<E: EthSpec>(
&self,
slot: Option<Slot>,
committee_index: Option<u64>,
) -> Result<ForkVersionedResponse<Vec<Attestation<E>>>, Error> {
let mut path = self.eth_path(V2)?;
path.path_segments_mut()
.map_err(|()| Error::InvalidUrl(self.server.clone()))?
.push("beacon")
.push("pool")
.push("attestations");
if let Some(slot) = slot {
path.query_pairs_mut()
.append_pair("slot", &slot.to_string());
}
if let Some(index) = committee_index {
path.query_pairs_mut()
.append_pair("committee_index", &index.to_string());
}
self.get(path).await
}
/// `POST v1/beacon/pool/attester_slashings`
pub async fn post_beacon_pool_attester_slashings_v1<E: EthSpec>(
&self,
slashing: &AttesterSlashing<E>,
) -> Result<(), Error> {
@@ -1275,8 +1387,30 @@ impl BeaconNodeHttpClient {
Ok(())
}
/// `GET beacon/pool/attester_slashings`
pub async fn get_beacon_pool_attester_slashings<E: EthSpec>(
/// `POST v2/beacon/pool/attester_slashings`
pub async fn post_beacon_pool_attester_slashings_v2<E: EthSpec>(
&self,
slashing: &AttesterSlashing<E>,
fork_name: ForkName,
) -> Result<(), Error> {
let mut path = self.eth_path(V2)?;
path.path_segments_mut()
.map_err(|()| Error::InvalidUrl(self.server.clone()))?
.push("beacon")
.push("pool")
.push("attester_slashings");
self.post_generic_json_without_content_type_header_but_with_consensus_header(
path, slashing, None, fork_name,
)
.await?;
Ok(())
}
/// `GET v1/beacon/pool/attester_slashings`
pub async fn get_beacon_pool_attester_slashings_v1<E: EthSpec>(
&self,
) -> Result<GenericResponse<Vec<AttesterSlashing<E>>>, Error> {
let mut path = self.eth_path(V1)?;
@@ -1290,6 +1424,21 @@ impl BeaconNodeHttpClient {
self.get(path).await
}
/// `GET v2/beacon/pool/attester_slashings`
pub async fn get_beacon_pool_attester_slashings_v2<E: EthSpec>(
&self,
) -> Result<ForkVersionedResponse<Vec<AttesterSlashing<E>>>, Error> {
let mut path = self.eth_path(V2)?;
path.path_segments_mut()
.map_err(|()| Error::InvalidUrl(self.server.clone()))?
.push("beacon")
.push("pool")
.push("attester_slashings");
self.get(path).await
}
/// `POST beacon/pool/proposer_slashings`
pub async fn post_beacon_pool_proposer_slashings(
&self,
@@ -2324,8 +2473,8 @@ impl BeaconNodeHttpClient {
.await
}
/// `POST validator/aggregate_and_proofs`
pub async fn post_validator_aggregate_and_proof<E: EthSpec>(
/// `POST v1/validator/aggregate_and_proofs`
pub async fn post_validator_aggregate_and_proof_v1<E: EthSpec>(
&self,
aggregates: &[SignedAggregateAndProof<E>],
) -> Result<(), Error> {
@@ -2342,6 +2491,30 @@ impl BeaconNodeHttpClient {
Ok(())
}
/// `POST v2/validator/aggregate_and_proofs`
pub async fn post_validator_aggregate_and_proof_v2<E: EthSpec>(
&self,
aggregates: &[SignedAggregateAndProof<E>],
fork_name: ForkName,
) -> Result<(), Error> {
let mut path = self.eth_path(V2)?;
path.path_segments_mut()
.map_err(|()| Error::InvalidUrl(self.server.clone()))?
.push("validator")
.push("aggregate_and_proofs");
self.post_with_timeout_and_consensus_header(
path,
&aggregates,
self.timeouts.attestation,
fork_name,
)
.await?;
Ok(())
}
/// `POST validator/beacon_committee_subscriptions`
pub async fn post_validator_beacon_committee_subscriptions(
&self,

View File

@@ -1,6 +1,6 @@
use crate::slot_data::SlotData;
use crate::Checkpoint;
use crate::{test_utils::TestRandom, Hash256, Slot};
use crate::{Checkpoint, ForkVersionDeserialize};
use derivative::Derivative;
use safe_arith::ArithError;
use serde::{Deserialize, Serialize};
@@ -487,6 +487,29 @@ impl<'a, E: EthSpec> From<AttestationRefOnDisk<'a, E>> for AttestationRef<'a, E>
}
}
impl<E: EthSpec> ForkVersionDeserialize for Vec<Attestation<E>> {
fn deserialize_by_fork<'de, D: serde::Deserializer<'de>>(
value: serde_json::Value,
fork_name: crate::ForkName,
) -> Result<Self, D::Error> {
if fork_name.electra_enabled() {
let attestations: Vec<AttestationElectra<E>> =
serde_json::from_value(value).map_err(serde::de::Error::custom)?;
Ok(attestations
.into_iter()
.map(Attestation::Electra)
.collect::<Vec<_>>())
} else {
let attestations: Vec<AttestationBase<E>> =
serde_json::from_value(value).map_err(serde::de::Error::custom)?;
Ok(attestations
.into_iter()
.map(Attestation::Base)
.collect::<Vec<_>>())
}
}
}
#[cfg(test)]
mod tests {
use super::*;

View File

@@ -160,6 +160,29 @@ impl<E: EthSpec> AttesterSlashing<E> {
}
}
impl<E: EthSpec> crate::ForkVersionDeserialize for Vec<AttesterSlashing<E>> {
fn deserialize_by_fork<'de, D: serde::Deserializer<'de>>(
value: serde_json::Value,
fork_name: crate::ForkName,
) -> Result<Self, D::Error> {
if fork_name.electra_enabled() {
let slashings: Vec<AttesterSlashingElectra<E>> =
serde_json::from_value(value).map_err(serde::de::Error::custom)?;
Ok(slashings
.into_iter()
.map(AttesterSlashing::Electra)
.collect::<Vec<_>>())
} else {
let slashings: Vec<AttesterSlashingBase<E>> =
serde_json::from_value(value).map_err(serde::de::Error::custom)?;
Ok(slashings
.into_iter()
.map(AttesterSlashing::Base)
.collect::<Vec<_>>())
}
}
}
#[cfg(test)]
mod tests {
use super::*;

View File

@@ -14,7 +14,7 @@ use std::ops::Deref;
use std::sync::Arc;
use tokio::time::{sleep, sleep_until, Duration, Instant};
use tree_hash::TreeHash;
use types::{Attestation, AttestationData, ChainSpec, CommitteeIndex, EthSpec, ForkName, Slot};
use types::{Attestation, AttestationData, ChainSpec, CommitteeIndex, EthSpec, Slot};
/// Builds an `AttestationService`.
pub struct AttestationServiceBuilder<T: SlotClock + 'static, E: EthSpec> {
@@ -449,6 +449,11 @@ impl<T: SlotClock + 'static, E: EthSpec> AttestationService<T, E> {
warn!(log, "No attestations were published");
return Ok(None);
}
let fork_name = self
.context
.eth2_config
.spec
.fork_name_at_slot::<E>(attestation_data.slot);
// Post the attestations to the BN.
match self
@@ -462,9 +467,15 @@ impl<T: SlotClock + 'static, E: EthSpec> AttestationService<T, E> {
&metrics::ATTESTATION_SERVICE_TIMES,
&[metrics::ATTESTATIONS_HTTP_POST],
);
beacon_node
.post_beacon_pool_attestations(attestations)
.await
if fork_name.electra_enabled() {
beacon_node
.post_beacon_pool_attestations_v2(attestations, fork_name)
.await
} else {
beacon_node
.post_beacon_pool_attestations_v1(attestations)
.await
}
},
)
.await
@@ -537,7 +548,7 @@ impl<T: SlotClock + 'static, E: EthSpec> AttestationService<T, E> {
&metrics::ATTESTATION_SERVICE_TIMES,
&[metrics::AGGREGATES_HTTP_GET],
);
let aggregate_attestation_result = if fork_name >= ForkName::Electra {
let aggregate_attestation_result = if fork_name.electra_enabled() {
beacon_node
.get_validator_aggregate_attestation_v2(
attestation_data.slot,
@@ -627,9 +638,20 @@ impl<T: SlotClock + 'static, E: EthSpec> AttestationService<T, E> {
&metrics::ATTESTATION_SERVICE_TIMES,
&[metrics::AGGREGATES_HTTP_POST],
);
beacon_node
.post_validator_aggregate_and_proof(signed_aggregate_and_proofs_slice)
.await
if fork_name.electra_enabled() {
beacon_node
.post_validator_aggregate_and_proof_v2(
signed_aggregate_and_proofs_slice,
fork_name,
)
.await
} else {
beacon_node
.post_validator_aggregate_and_proof_v1(
signed_aggregate_and_proofs_slice,
)
.await
}
},
)
.await