Add flag to disable attestation APIs

This commit is contained in:
Michael Sproul
2025-02-25 11:39:48 +11:00
parent 522b3cbaab
commit bbc1200b2d
5 changed files with 247 additions and 102 deletions

View File

@@ -1859,6 +1859,13 @@ pub fn serve<T: BeaconChainTypes>(
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>,
reprocess_tx: Option<Sender<ReprocessQueueMessage>>,
log: Logger| async move {
if chain.config.disable_attesting {
return convert_rejection::<Response<String>>(Err(
warp_utils::reject::custom_bad_request("Attesting disabled".to_string()),
))
.await;
}
let attestations = attestations.into_iter().map(Either::Left).collect();
let result = crate::publish_attestations::publish_attestations(
task_spawner,
@@ -1891,6 +1898,12 @@ pub fn serve<T: BeaconChainTypes>(
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>,
reprocess_tx: Option<Sender<ReprocessQueueMessage>>,
log: Logger| async move {
if chain.config.disable_attesting {
return convert_rejection::<Response<String>>(Err(
warp_utils::reject::custom_bad_request("Attesting disabled".to_string()),
))
.await;
}
let attestations =
match crate::publish_attestations::deserialize_attestation_payload::<T>(
payload, fork_name, &log,
@@ -1937,49 +1950,66 @@ pub fn serve<T: BeaconChainTypes>(
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
query: api_types::AttestationPoolQuery| {
task_spawner.blocking_response_task(Priority::P1, move || {
let query_filter = |data: &AttestationData| {
query.slot.is_none_or(|slot| slot == data.slot)
&& query
.committee_index
.is_none_or(|index| index == data.index)
};
async move {
if chain.config.disable_attesting {
return convert_rejection::<Response<String>>(Err(
warp_utils::reject::custom_bad_request(
"Attesting disabled".to_string(),
),
))
.await;
}
task_spawner
.blocking_response_task(Priority::P1, move || {
let query_filter = |data: &AttestationData| {
query.slot.is_none_or(|slot| slot == data.slot)
&& query
.committee_index
.is_none_or(|index| index == data.index)
};
let mut attestations = chain.op_pool.get_filtered_attestations(query_filter);
attestations.extend(
chain
.naive_aggregation_pool
.read()
.iter()
.filter(|&att| query_filter(att.data()))
.cloned(),
);
// Use the current slot to find the fork version, and convert all messages to the
// current fork's format. This is to ensure consistent message types matching
// `Eth-Consensus-Version`.
let current_slot =
chain
.slot_clock
.now()
.ok_or(warp_utils::reject::custom_server_error(
"unable to read slot clock".to_string(),
))?;
let fork_name = chain.spec.fork_name_at_slot::<T::EthSpec>(current_slot);
let attestations = attestations
.into_iter()
.filter(|att| {
(fork_name.electra_enabled() && matches!(att, Attestation::Electra(_)))
|| (!fork_name.electra_enabled()
&& matches!(att, Attestation::Base(_)))
let mut attestations =
chain.op_pool.get_filtered_attestations(query_filter);
attestations.extend(
chain
.naive_aggregation_pool
.read()
.iter()
.filter(|&att| query_filter(att.data()))
.cloned(),
);
// Use the current slot to find the fork version, and convert all messages to the
// current fork's format. This is to ensure consistent message types matching
// `Eth-Consensus-Version`.
let current_slot = chain.slot_clock.now().ok_or(
warp_utils::reject::custom_server_error(
"unable to read slot clock".to_string(),
),
)?;
let fork_name =
chain.spec.fork_name_at_slot::<T::EthSpec>(current_slot);
let attestations = attestations
.into_iter()
.filter(|att| {
(fork_name.electra_enabled()
&& matches!(att, Attestation::Electra(_)))
|| (!fork_name.electra_enabled()
&& matches!(att, Attestation::Base(_)))
})
.collect::<Vec<_>>();
let res = fork_versioned_response(
endpoint_version,
fork_name,
&attestations,
)?;
Ok(add_consensus_version_header(
warp::reply::json(&res).into_response(),
fork_name,
))
})
.collect::<Vec<_>>();
let res = fork_versioned_response(endpoint_version, fork_name, &attestations)?;
Ok(add_consensus_version_header(
warp::reply::json(&res).into_response(),
fork_name,
))
})
.await
}
},
);
@@ -2200,12 +2230,24 @@ pub fn serve<T: BeaconChainTypes>(
signatures: Vec<SyncCommitteeMessage>,
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>,
log: Logger| {
task_spawner.blocking_json_task(Priority::P0, move || {
sync_committees::process_sync_committee_signatures(
signatures, network_tx, &chain, log,
)?;
Ok(api_types::GenericResponse::from(()))
})
async move {
if chain.config.disable_attesting {
return convert_rejection::<Response<String>>(Err(
warp_utils::reject::custom_bad_request(
"Attesting disabled".to_string(),
),
))
.await;
}
task_spawner
.blocking_json_task(Priority::P0, move || {
sync_committees::process_sync_committee_signatures(
signatures, network_tx, &chain, log,
)?;
Ok(api_types::GenericResponse::from(()))
})
.await
}
},
);
@@ -3419,10 +3461,22 @@ pub fn serve<T: BeaconChainTypes>(
indices: api_types::ValidatorIndexData,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>| {
task_spawner.blocking_json_task(Priority::P0, move || {
not_synced_filter?;
attester_duties::attester_duties(epoch, &indices.0, &chain)
})
async move {
if chain.config.disable_attesting {
return convert_rejection::<Response<String>>(Err(
warp_utils::reject::custom_bad_request(
"Attesting disabled".to_string(),
),
))
.await;
}
task_spawner
.blocking_json_task(Priority::P0, move || {
not_synced_filter?;
attester_duties::attester_duties(epoch, &indices.0, &chain)
})
.await
}
},
);
@@ -3447,10 +3501,22 @@ pub fn serve<T: BeaconChainTypes>(
indices: api_types::ValidatorIndexData,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>| {
task_spawner.blocking_json_task(Priority::P0, move || {
not_synced_filter?;
sync_committees::sync_committee_duties(epoch, &indices.0, &chain)
})
async move {
if chain.config.disable_attesting {
return convert_rejection::<Response<String>>(Err(
warp_utils::reject::custom_bad_request(
"Attesting disabled".to_string(),
),
))
.await;
}
task_spawner
.blocking_json_task(Priority::P0, move || {
not_synced_filter?;
sync_committees::sync_committee_duties(epoch, &indices.0, &chain)
})
.await
}
},
);
@@ -3468,23 +3534,35 @@ pub fn serve<T: BeaconChainTypes>(
not_synced_filter: Result<(), Rejection>,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>| {
task_spawner.blocking_json_task(Priority::P0, move || {
not_synced_filter?;
chain
.get_aggregated_sync_committee_contribution(&sync_committee_data)
.map_err(|e| {
warp_utils::reject::custom_bad_request(format!(
"unable to fetch sync contribution: {:?}",
e
))
})?
.map(api_types::GenericResponse::from)
.ok_or_else(|| {
warp_utils::reject::custom_not_found(
"no matching sync contribution found".to_string(),
)
async move {
if chain.config.disable_attesting {
return convert_rejection::<Response<String>>(Err(
warp_utils::reject::custom_bad_request(
"Attesting disabled".to_string(),
),
))
.await;
}
task_spawner
.blocking_json_task(Priority::P0, move || {
not_synced_filter?;
chain
.get_aggregated_sync_committee_contribution(&sync_committee_data)
.map_err(|e| {
warp_utils::reject::custom_bad_request(format!(
"unable to fetch sync contribution: {:?}",
e
))
})?
.map(api_types::GenericResponse::from)
.ok_or_else(|| {
warp_utils::reject::custom_not_found(
"no matching sync contribution found".to_string(),
)
})
})
})
.await
}
},
);
@@ -3509,6 +3587,15 @@ pub fn serve<T: BeaconChainTypes>(
chain: Arc<BeaconChain<T>>,
aggregates: Vec<SignedAggregateAndProof<T::EthSpec>>,
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>, log: Logger| {
async move {
if chain.config.disable_attesting {
return convert_rejection::<Response<String>>(Err(
warp_utils::reject::custom_bad_request(
"Attesting disabled".to_string(),
),
))
.await;
}
task_spawner.blocking_json_task(Priority::P0, move || {
not_synced_filter?;
let seen_timestamp = timestamp_now();
@@ -3604,7 +3691,8 @@ pub fn serve<T: BeaconChainTypes>(
} else {
Ok(())
}
})
}).await
}
},
);
@@ -3625,36 +3713,58 @@ pub fn serve<T: BeaconChainTypes>(
contributions: Vec<SignedContributionAndProof<T::EthSpec>>,
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>,
log: Logger| {
task_spawner.blocking_json_task(Priority::P0, move || {
not_synced_filter?;
sync_committees::process_signed_contribution_and_proofs(
contributions,
network_tx,
&chain,
log,
)?;
Ok(api_types::GenericResponse::from(()))
})
async move {
if chain.config.disable_attesting {
return convert_rejection::<Response<String>>(Err(
warp_utils::reject::custom_bad_request(
"Attesting disabled".to_string(),
),
))
.await;
}
task_spawner
.blocking_json_task(Priority::P0, move || {
not_synced_filter?;
sync_committees::process_signed_contribution_and_proofs(
contributions,
network_tx,
&chain,
log,
)?;
Ok(api_types::GenericResponse::from(()))
})
.await
}
},
);
// POST validator/beacon_committee_subscriptions
let post_validator_beacon_committee_subscriptions = eth_v1
.and(warp::path("validator"))
.and(warp::path("beacon_committee_subscriptions"))
.and(warp::path::end())
.and(warp_utils::json::json())
.and(validator_subscription_tx_filter.clone())
.and(task_spawner_filter.clone())
.and(chain_filter.clone())
.and(log_filter.clone())
.then(
|subscriptions: Vec<api_types::BeaconCommitteeSubscription>,
validator_subscription_tx: Sender<ValidatorSubscriptionMessage>,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
log: Logger| {
task_spawner.blocking_json_task(Priority::P0, move || {
let post_validator_beacon_committee_subscriptions =
eth_v1
.and(warp::path("validator"))
.and(warp::path("beacon_committee_subscriptions"))
.and(warp::path::end())
.and(warp_utils::json::json())
.and(validator_subscription_tx_filter.clone())
.and(task_spawner_filter.clone())
.and(chain_filter.clone())
.and(log_filter.clone())
.then(
|subscriptions: Vec<api_types::BeaconCommitteeSubscription>,
validator_subscription_tx: Sender<ValidatorSubscriptionMessage>,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
log: Logger| {
async move {
if chain.config.disable_attesting {
return convert_rejection::<Response<String>>(Err(
warp_utils::reject::custom_bad_request(
"Attesting disabled".to_string(),
),
))
.await;
}
task_spawner.blocking_json_task(Priority::P0, move || {
let subscriptions: std::collections::BTreeSet<_> = subscriptions
.iter()
.map(|subscription| {
@@ -3686,9 +3796,10 @@ pub fn serve<T: BeaconChainTypes>(
}
Ok(())
})
},
);
}).await
}
},
);
// POST validator/prepare_beacon_proposer
let post_validator_prepare_beacon_proposer = eth_v1
@@ -3945,6 +4056,15 @@ pub fn serve<T: BeaconChainTypes>(
chain: Arc<BeaconChain<T>>,
log: Logger
| {
async move {
if chain.config.disable_attesting {
return convert_rejection::<Response<String>>(Err(
warp_utils::reject::custom_bad_request(
"Attesting disabled".to_string(),
),
))
.await;
}
task_spawner.blocking_json_task(Priority::P0, move || {
for subscription in subscriptions {
chain
@@ -3969,7 +4089,8 @@ pub fn serve<T: BeaconChainTypes>(
}
Ok(())
})
}).await
}
},
);