udpates for aggregate attestation endpoint

This commit is contained in:
realbigsean
2024-05-08 22:18:07 -04:00
parent 7abb7621d5
commit c8fca4f1d0
5 changed files with 130 additions and 42 deletions

View File

@@ -256,6 +256,7 @@ pub fn prometheus_metrics() -> warp::filters::log::Log<impl Fn(warp::filters::lo
.or_else(|| starts_with("v1/validator/duties/sync")) .or_else(|| starts_with("v1/validator/duties/sync"))
.or_else(|| starts_with("v1/validator/attestation_data")) .or_else(|| starts_with("v1/validator/attestation_data"))
.or_else(|| starts_with("v1/validator/aggregate_attestation")) .or_else(|| starts_with("v1/validator/aggregate_attestation"))
.or_else(|| starts_with("v2/validator/aggregate_attestation"))
.or_else(|| starts_with("v1/validator/aggregate_and_proofs")) .or_else(|| starts_with("v1/validator/aggregate_and_proofs"))
.or_else(|| starts_with("v1/validator/sync_committee_contribution")) .or_else(|| starts_with("v1/validator/sync_committee_contribution"))
.or_else(|| starts_with("v1/validator/contribution_and_proofs")) .or_else(|| starts_with("v1/validator/contribution_and_proofs"))
@@ -3175,7 +3176,7 @@ pub fn serve<T: BeaconChainTypes>(
); );
// GET validator/aggregate_attestation?attestation_data_root,slot // GET validator/aggregate_attestation?attestation_data_root,slot
let get_validator_aggregate_attestation = eth_v1 let get_validator_aggregate_attestation = any_version
.and(warp::path("validator")) .and(warp::path("validator"))
.and(warp::path("aggregate_attestation")) .and(warp::path("aggregate_attestation"))
.and(warp::path::end()) .and(warp::path::end())
@@ -3184,11 +3185,24 @@ pub fn serve<T: BeaconChainTypes>(
.and(task_spawner_filter.clone()) .and(task_spawner_filter.clone())
.and(chain_filter.clone()) .and(chain_filter.clone())
.then( .then(
|query: api_types::ValidatorAggregateAttestationQuery, |endpoint_version: EndpointVersion,
query: api_types::ValidatorAggregateAttestationQuery,
not_synced_filter: Result<(), Rejection>, not_synced_filter: Result<(), Rejection>,
task_spawner: TaskSpawner<T::EthSpec>, task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>| { chain: Arc<BeaconChain<T>>| {
task_spawner.blocking_json_task(Priority::P0, move || { task_spawner.blocking_json_task(Priority::P0, move || {
if endpoint_version == V2 {
if query.committee_index.is_none() {
return Err(warp_utils::reject::custom_bad_request(
"missing committee index".to_string(),
));
}
} else if endpoint_version == V1 {
// Do nothing
} else {
return Err(unsupported_version_rejection(endpoint_version));
}
//TODO(electra) pass the index into the next method.
not_synced_filter?; not_synced_filter?;
chain chain
.get_aggregated_attestation_by_slot_and_root( .get_aggregated_attestation_by_slot_and_root(

View File

@@ -3191,30 +3191,52 @@ impl ApiTester {
} }
pub async fn test_get_validator_aggregate_attestation(self) -> Self { pub async fn test_get_validator_aggregate_attestation(self) -> Self {
let attestation = self if self
.chain .chain
.head_beacon_block() .spec
.message() .fork_name_at_slot::<E>(self.chain.slot().unwrap())
.body() >= ForkName::Electra
.attestations() {
.next() for attestation in self.chain.naive_aggregation_pool.read().iter() {
.unwrap() let result = self
.clone_as_attestation(); .client
.get_validator_aggregate_attestation_v2(
attestation.data().slot,
attestation.data().tree_hash_root(),
attestation.committee_index(),
)
.await
.unwrap()
.unwrap()
.data;
let expected = attestation;
let result = self assert_eq!(&result, expected);
.client }
.get_validator_aggregate_attestation( } else {
attestation.data().slot, let attestation = self
attestation.data().tree_hash_root(), .chain
) .head_beacon_block()
.await .message()
.unwrap() .body()
.unwrap() .attestations()
.data; .next()
.unwrap()
.clone_as_attestation();
let result = self
.client
.get_validator_aggregate_attestation_v1(
attestation.data().slot,
attestation.data().tree_hash_root(),
)
.await
.unwrap()
.unwrap()
.data;
let expected = attestation;
let expected = attestation; assert_eq!(result, expected);
}
assert_eq!(result, expected);
self self
} }

View File

@@ -2179,8 +2179,8 @@ impl BeaconNodeHttpClient {
self.get_with_timeout(path, self.timeouts.attestation).await self.get_with_timeout(path, self.timeouts.attestation).await
} }
/// `GET validator/aggregate_attestation?slot,attestation_data_root` /// `GET v1/validator/aggregate_attestation?slot,attestation_data_root`
pub async fn get_validator_aggregate_attestation<E: EthSpec>( pub async fn get_validator_aggregate_attestation_v1<E: EthSpec>(
&self, &self,
slot: Slot, slot: Slot,
attestation_data_root: Hash256, attestation_data_root: Hash256,
@@ -2203,6 +2203,32 @@ impl BeaconNodeHttpClient {
.await .await
} }
/// `GET v2/validator/aggregate_attestation?slot,attestation_data_root,committee_index`
pub async fn get_validator_aggregate_attestation_v2<E: EthSpec>(
&self,
slot: Slot,
attestation_data_root: Hash256,
committee_index: CommitteeIndex,
) -> Result<Option<GenericResponse<Attestation<E>>>, Error> {
let mut path = self.eth_path(V2)?;
path.path_segments_mut()
.map_err(|()| Error::InvalidUrl(self.server.clone()))?
.push("validator")
.push("aggregate_attestation");
path.query_pairs_mut()
.append_pair("slot", &slot.to_string())
.append_pair(
"attestation_data_root",
&format!("{:?}", attestation_data_root),
)
.append_pair("committee_index", &committee_index.to_string());
self.get_opt_with_timeout(path, self.timeouts.attestation)
.await
}
/// `GET validator/sync_committee_contribution` /// `GET validator/sync_committee_contribution`
pub async fn get_validator_sync_committee_contribution<E: EthSpec>( pub async fn get_validator_sync_committee_contribution<E: EthSpec>(
&self, &self,

View File

@@ -765,6 +765,8 @@ pub struct ValidatorAttestationDataQuery {
pub struct ValidatorAggregateAttestationQuery { pub struct ValidatorAggregateAttestationQuery {
pub attestation_data_root: Hash256, pub attestation_data_root: Hash256,
pub slot: Slot, pub slot: Slot,
#[serde(skip_serializing_if = "Option::is_none")]
pub committee_index: Option<CommitteeIndex>,
} }
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Hash)] #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Hash)]

View File

@@ -14,6 +14,7 @@ use std::ops::Deref;
use std::sync::Arc; use std::sync::Arc;
use tokio::time::{sleep, sleep_until, Duration, Instant}; use tokio::time::{sleep, sleep_until, Duration, Instant};
use tree_hash::TreeHash; use tree_hash::TreeHash;
use types::ForkName;
use types::{ use types::{
attestation::AttestationBase, AggregateSignature, Attestation, AttestationData, BitList, attestation::AttestationBase, AggregateSignature, Attestation, AttestationData, BitList,
ChainSpec, CommitteeIndex, EthSpec, Slot, ChainSpec, CommitteeIndex, EthSpec, Slot,
@@ -290,17 +291,21 @@ impl<T: SlotClock + 'static, E: EthSpec> AttestationService<T, E> {
// Then download, sign and publish a `SignedAggregateAndProof` for each // Then download, sign and publish a `SignedAggregateAndProof` for each
// validator that is elected to aggregate for this `slot` and // validator that is elected to aggregate for this `slot` and
// `committee_index`. // `committee_index`.
self.produce_and_publish_aggregates(&attestation_data, &validator_duties) self.produce_and_publish_aggregates(
.await &attestation_data,
.map_err(move |e| { committee_index,
crit!( &validator_duties,
log, )
"Error during attestation routine"; .await
"error" => format!("{:?}", e), .map_err(move |e| {
"committee_index" => committee_index, crit!(
"slot" => slot.as_u64(), log,
) "Error during attestation routine";
})?; "error" => format!("{:?}", e),
"committee_index" => committee_index,
"slot" => slot.as_u64(),
)
})?;
} }
Ok(()) Ok(())
@@ -493,6 +498,7 @@ impl<T: SlotClock + 'static, E: EthSpec> AttestationService<T, E> {
async fn produce_and_publish_aggregates( async fn produce_and_publish_aggregates(
&self, &self,
attestation_data: &AttestationData, attestation_data: &AttestationData,
committee_index: CommitteeIndex,
validator_duties: &[DutyAndProof], validator_duties: &[DutyAndProof],
) -> Result<(), String> { ) -> Result<(), String> {
let log = self.context.log(); let log = self.context.log();
@@ -505,6 +511,12 @@ impl<T: SlotClock + 'static, E: EthSpec> AttestationService<T, E> {
return Ok(()); return Ok(());
} }
let fork_name = self
.context
.eth2_config
.spec
.fork_name_at_slot::<E>(attestation_data.slot);
let aggregated_attestation = &self let aggregated_attestation = &self
.beacon_nodes .beacon_nodes
.first_success( .first_success(
@@ -515,12 +527,24 @@ impl<T: SlotClock + 'static, E: EthSpec> AttestationService<T, E> {
&metrics::ATTESTATION_SERVICE_TIMES, &metrics::ATTESTATION_SERVICE_TIMES,
&[metrics::AGGREGATES_HTTP_GET], &[metrics::AGGREGATES_HTTP_GET],
); );
beacon_node let aggregate_attestation_result = if fork_name >= ForkName::Electra {
.get_validator_aggregate_attestation( beacon_node
attestation_data.slot, .get_validator_aggregate_attestation_v2(
attestation_data.tree_hash_root(), attestation_data.slot,
) attestation_data.tree_hash_root(),
.await committee_index,
)
.await
} else {
beacon_node
.get_validator_aggregate_attestation_v1(
attestation_data.slot,
attestation_data.tree_hash_root(),
)
.await
};
aggregate_attestation_result
.map_err(|e| { .map_err(|e| {
format!("Failed to produce an aggregate attestation: {:?}", e) format!("Failed to produce an aggregate attestation: {:?}", e)
})? })?