update GET requests

This commit is contained in:
realbigsean
2024-07-08 14:04:36 -07:00
parent 897f06a29c
commit 6766f329e3

View File

@@ -31,6 +31,7 @@ mod validators;
mod version;
use crate::produce_block::{produce_blinded_block_v2, produce_block_v2, produce_block_v3};
use crate::version::fork_versioned_response;
use beacon_chain::{
attestation_verification::VerifiedAttestation, observed_operations::ObservationOutcome,
validator_monitor::timestamp_now, AttestationError as AttnError, BeaconChain, BeaconChainError,
@@ -258,11 +259,13 @@ pub fn prometheus_metrics() -> warp::filters::log::Log<impl Fn(warp::filters::lo
.or_else(|| starts_with("v1/validator/aggregate_attestation"))
.or_else(|| starts_with("v2/validator/aggregate_attestation"))
.or_else(|| starts_with("v1/validator/aggregate_and_proofs"))
.or_else(|| starts_with("v2/validator/aggregate_and_proofs"))
.or_else(|| starts_with("v1/validator/sync_committee_contribution"))
.or_else(|| starts_with("v1/validator/contribution_and_proofs"))
.or_else(|| starts_with("v1/validator/beacon_committee_subscriptions"))
.or_else(|| starts_with("v1/validator/sync_committee_subscriptions"))
.or_else(|| starts_with("v1/beacon/pool/attestations"))
.or_else(|| starts_with("v2/beacon/pool/attestations"))
.or_else(|| starts_with("v1/beacon/pool/sync_committees"))
.or_else(|| starts_with("v1/beacon/blocks/head/root"))
.or_else(|| starts_with("v1/validator/prepare_beacon_proposer"))
@@ -1624,26 +1627,38 @@ pub fn serve<T: BeaconChainTypes>(
);
// GET beacon/blocks/{block_id}/attestations
let get_beacon_block_attestations = beacon_blocks_path_v1
let get_beacon_block_attestations = beacon_blocks_path_any
.clone()
.and(warp::path("attestations"))
.and(warp::path::end())
.then(
|block_id: BlockId,
|endpoint_version: EndpointVersion,
block_id: BlockId,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>| {
task_spawner.blocking_json_task(Priority::P1, move || {
task_spawner.blocking_response_task(Priority::P1, move || {
let (block, execution_optimistic, finalized) =
block_id.blinded_block(&chain)?;
Ok(api_types::GenericResponse::from(
block
.message()
.body()
.attestations()
.map(|att| att.clone_as_attestation())
.collect::<Vec<_>>(),
)
.add_execution_optimistic_finalized(execution_optimistic, finalized))
let fork_name = block
.fork_name(&chain.spec)
.map_err(inconsistent_fork_rejection)?;
let atts = block
.message()
.body()
.attestations()
.map(|att| att.clone_as_attestation())
.collect::<Vec<_>>();
let res = execution_optimistic_finalized_fork_versioned_response(
endpoint_version,
fork_name,
execution_optimistic,
finalized,
&atts,
)?;
Ok(add_consensus_version_header(
warp::reply::json(&res).into_response(),
fork_name,
))
})
},
);
@@ -1751,8 +1766,14 @@ pub fn serve<T: BeaconChainTypes>(
.and(task_spawner_filter.clone())
.and(chain_filter.clone());
let beacon_pool_path_any = any_version
.and(warp::path("beacon"))
.and(warp::path("pool"))
.and(task_spawner_filter.clone())
.and(chain_filter.clone());
// POST beacon/pool/attestations
let post_beacon_pool_attestations = beacon_pool_path
let post_beacon_pool_attestations = beacon_pool_path_any
.clone()
.and(warp::path("attestations"))
.and(warp::path::end())
@@ -1761,7 +1782,8 @@ pub fn serve<T: BeaconChainTypes>(
.and(reprocess_send_filter)
.and(log_filter.clone())
.then(
|task_spawner: TaskSpawner<T::EthSpec>,
|endpoint_version: EndpointVersion,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
attestations: Vec<Attestation<T::EthSpec>>,
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>,
@@ -1782,16 +1804,17 @@ pub fn serve<T: BeaconChainTypes>(
);
// GET beacon/pool/attestations?committee_index,slot
let get_beacon_pool_attestations = beacon_pool_path
let get_beacon_pool_attestations = beacon_pool_path_any
.clone()
.and(warp::path("attestations"))
.and(warp::path::end())
.and(warp::query::<api_types::AttestationPoolQuery>())
.then(
|task_spawner: TaskSpawner<T::EthSpec>,
|endpoint_version: EndpointVersion,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
query: api_types::AttestationPoolQuery| {
task_spawner.blocking_json_task(Priority::P1, move || {
task_spawner.blocking_response_task(Priority::P1, move || {
let query_filter = |data: &AttestationData| {
query.slot.map_or(true, |slot| slot == data.slot)
&& query
@@ -1808,20 +1831,37 @@ pub fn serve<T: BeaconChainTypes>(
.filter(|&att| query_filter(att.data()))
.cloned(),
);
Ok(api_types::GenericResponse::from(attestations))
let slot = query
.slot
.or_else(|| {
attestations
.first()
.map(|att| att.data().slot)
.or_else(|| chain.slot_clock.now())
})
.ok_or(warp_utils::reject::custom_server_error(
"unable to read slot clock".to_string(),
))?;
let fork_name = chain.spec.fork_name_at_slot::<T::EthSpec>(slot);
let res = fork_versioned_response(endpoint_version, fork_name, &attestations)?;
Ok(add_consensus_version_header(
warp::reply::json(&res).into_response(),
fork_name,
))
})
},
);
// POST beacon/pool/attester_slashings
let post_beacon_pool_attester_slashings = beacon_pool_path
let post_beacon_pool_attester_slashings = beacon_pool_path_any
.clone()
.and(warp::path("attester_slashings"))
.and(warp::path::end())
.and(warp_utils::json::json())
.and(network_tx_filter.clone())
.then(
|task_spawner: TaskSpawner<T::EthSpec>,
|endpoint_version: EndpointVersion,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
slashing: AttesterSlashing<T::EthSpec>,
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>| {
@@ -1858,15 +1898,29 @@ pub fn serve<T: BeaconChainTypes>(
);
// GET beacon/pool/attester_slashings
let get_beacon_pool_attester_slashings = beacon_pool_path
let get_beacon_pool_attester_slashings = beacon_pool_path_any
.clone()
.and(warp::path("attester_slashings"))
.and(warp::path::end())
.then(
|task_spawner: TaskSpawner<T::EthSpec>, chain: Arc<BeaconChain<T>>| {
task_spawner.blocking_json_task(Priority::P1, move || {
let attestations = chain.op_pool.get_all_attester_slashings();
Ok(api_types::GenericResponse::from(attestations))
|endpoint_version: EndpointVersion,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>| {
task_spawner.blocking_response_task(Priority::P1, move || {
let slashings = chain.op_pool.get_all_attester_slashings();
let slot = slashings
.first()
.map(|slashing| slashing.attestation_1().data().slot)
.or_else(|| chain.slot_clock.now())
.ok_or(warp_utils::reject::custom_server_error(
"unable to read slot clock".to_string(),
))?;
let fork_name = chain.spec.fork_name_at_slot::<T::EthSpec>(slot);
let res = fork_versioned_response(endpoint_version, fork_name, &slashings)?;
Ok(add_consensus_version_header(
warp::reply::json(&res).into_response(),
fork_name,
))
})
},
);
@@ -3319,7 +3373,7 @@ pub fn serve<T: BeaconChainTypes>(
);
// POST validator/aggregate_and_proofs
let post_validator_aggregate_and_proofs = eth_v1
let post_validator_aggregate_and_proofs = any_version
.and(warp::path("validator"))
.and(warp::path("aggregate_and_proofs"))
.and(warp::path::end())
@@ -3330,7 +3384,8 @@ pub fn serve<T: BeaconChainTypes>(
.and(network_tx_filter.clone())
.and(log_filter.clone())
.then(
|not_synced_filter: Result<(), Rejection>,
|endpoint_version: EndpointVersion,
not_synced_filter: Result<(), Rejection>,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
aggregates: Vec<SignedAggregateAndProof<T::EthSpec>>,