Return not synced errors for endpoints that require syncing (#5136)

* add not synced filter into then blocks

* refactor
This commit is contained in:
Eitan Seri-Levi
2024-04-04 04:36:23 +03:00
committed by GitHub
parent 7825af4a6e
commit f4cdcea7b1

View File

@@ -96,7 +96,7 @@ use warp::http::StatusCode;
use warp::hyper::Body;
use warp::sse::Event;
use warp::Reply;
use warp::{http::Response, Filter};
use warp::{http::Response, Filter, Rejection};
use warp_utils::{query::multi_key_query, uor::UnifyingOrFilter};
const API_PREFIX: &str = "eth";
@@ -453,7 +453,7 @@ pub fn serve<T: BeaconChainTypes>(
warp::any()
.and(network_globals.clone())
.and(chain_filter.clone())
.and_then(
.then(
move |network_globals: Arc<NetworkGlobals<T::EthSpec>>,
chain: Arc<BeaconChain<T>>| async move {
match *network_globals.sync_state.read() {
@@ -488,8 +488,7 @@ pub fn serve<T: BeaconChainTypes>(
)),
}
},
)
.untuple_one();
);
// Create a `warp` filter that provides access to the logger.
let inner_ctx = ctx.clone();
@@ -3058,10 +3057,12 @@ pub fn serve<T: BeaconChainTypes>(
.and(log_filter.clone())
.then(
|epoch: Epoch,
not_synced_filter: Result<(), Rejection>,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
log: Logger| {
task_spawner.blocking_json_task(Priority::P0, move || {
not_synced_filter?;
proposer_duties::proposer_duties(epoch, &chain, &log)
})
},
@@ -3087,6 +3088,7 @@ pub fn serve<T: BeaconChainTypes>(
|endpoint_version: EndpointVersion,
slot: Slot,
accept_header: Option<api_types::Accept>,
not_synced_filter: Result<(), Rejection>,
query: api_types::ValidatorBlocksQuery,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
@@ -3098,6 +3100,8 @@ pub fn serve<T: BeaconChainTypes>(
"slot" => slot
);
not_synced_filter?;
if endpoint_version == V3 {
produce_block_v3(accept_header, chain, slot, query).await
} else {
@@ -3124,11 +3128,13 @@ pub fn serve<T: BeaconChainTypes>(
.and(chain_filter.clone())
.then(
|slot: Slot,
not_synced_filter: Result<(), Rejection>,
query: api_types::ValidatorBlocksQuery,
accept_header: Option<api_types::Accept>,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>| {
task_spawner.spawn_async_with_rejection(Priority::P0, async move {
not_synced_filter?;
produce_blinded_block_v2(EndpointVersion(2), accept_header, chain, slot, query)
.await
})
@@ -3146,9 +3152,12 @@ pub fn serve<T: BeaconChainTypes>(
.and(chain_filter.clone())
.then(
|query: api_types::ValidatorAttestationDataQuery,
not_synced_filter: Result<(), Rejection>,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>| {
task_spawner.blocking_json_task(Priority::P0, move || {
not_synced_filter?;
let current_slot = chain
.slot()
.map_err(warp_utils::reject::beacon_chain_error)?;
@@ -3181,9 +3190,11 @@ pub fn serve<T: BeaconChainTypes>(
.and(chain_filter.clone())
.then(
|query: api_types::ValidatorAggregateAttestationQuery,
not_synced_filter: Result<(), Rejection>,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>| {
task_spawner.blocking_json_task(Priority::P0, move || {
not_synced_filter?;
chain
.get_aggregated_attestation_by_slot_and_root(
query.slot,
@@ -3222,10 +3233,12 @@ pub fn serve<T: BeaconChainTypes>(
.and(chain_filter.clone())
.then(
|epoch: Epoch,
not_synced_filter: Result<(), Rejection>,
indices: api_types::ValidatorIndexData,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>| {
task_spawner.blocking_json_task(Priority::P0, move || {
not_synced_filter?;
attester_duties::attester_duties(epoch, &indices.0, &chain)
})
},
@@ -3248,10 +3261,12 @@ pub fn serve<T: BeaconChainTypes>(
.and(chain_filter.clone())
.then(
|epoch: Epoch,
not_synced_filter: Result<(), Rejection>,
indices: api_types::ValidatorIndexData,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>| {
task_spawner.blocking_json_task(Priority::P0, move || {
not_synced_filter?;
sync_committees::sync_committee_duties(epoch, &indices.0, &chain)
})
},
@@ -3268,9 +3283,11 @@ pub fn serve<T: BeaconChainTypes>(
.and(chain_filter.clone())
.then(
|sync_committee_data: SyncContributionData,
not_synced_filter: Result<(), Rejection>,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>| {
task_spawner.blocking_json_task(Priority::P0, move || {
not_synced_filter?;
chain
.get_aggregated_sync_committee_contribution(&sync_committee_data)
.map_err(|e| {
@@ -3301,11 +3318,13 @@ pub fn serve<T: BeaconChainTypes>(
.and(network_tx_filter.clone())
.and(log_filter.clone())
.then(
|task_spawner: TaskSpawner<T::EthSpec>,
|not_synced_filter: Result<(), Rejection>,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
aggregates: Vec<SignedAggregateAndProof<T::EthSpec>>,
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>, log: Logger| {
task_spawner.blocking_json_task(Priority::P0, move || {
not_synced_filter?;
let seen_timestamp = timestamp_now();
let mut verified_aggregates = Vec::with_capacity(aggregates.len());
let mut messages = Vec::with_capacity(aggregates.len());
@@ -3414,12 +3433,14 @@ pub fn serve<T: BeaconChainTypes>(
.and(network_tx_filter)
.and(log_filter.clone())
.then(
|task_spawner: TaskSpawner<T::EthSpec>,
|not_synced_filter: Result<(), Rejection>,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
contributions: Vec<SignedContributionAndProof<T::EthSpec>>,
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>,
log: Logger| {
task_spawner.blocking_json_task(Priority::P0, move || {
not_synced_filter?;
sync_committees::process_signed_contribution_and_proofs(
contributions,
network_tx,
@@ -3494,11 +3515,13 @@ pub fn serve<T: BeaconChainTypes>(
.and(log_filter.clone())
.and(warp_utils::json::json())
.then(
|task_spawner: TaskSpawner<T::EthSpec>,
|not_synced_filter: Result<(), Rejection>,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
log: Logger,
preparation_data: Vec<ProposerPreparationData>| {
task_spawner.spawn_async_with_rejection(Priority::P0, async move {
not_synced_filter?;
let execution_layer = chain
.execution_layer
.as_ref()
@@ -4197,8 +4220,11 @@ pub fn serve<T: BeaconChainTypes>(
.and(task_spawner_filter.clone())
.and(chain_filter.clone())
.then(
|task_spawner: TaskSpawner<T::EthSpec>, chain: Arc<BeaconChain<T>>| {
|not_synced_filter: Result<(), Rejection>,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>| {
task_spawner.blocking_json_task(Priority::P1, move || {
not_synced_filter?;
chain.store_migrator.process_reconstruction();
Ok("success")
})