Move beacon state endpoints to a separate module. (#8529)

Part of the http api refactor to move endpoint handlers to separate modules.

This should improve code maintainability, incremental compilation time and rust analyzer performance.


  


Co-Authored-By: Jimmy Chen <jchen.tc@gmail.com>
This commit is contained in:
Jimmy Chen
2025-12-04 17:58:57 +11:00
committed by GitHub
parent 41ba135034
commit 51d0336020
4 changed files with 830 additions and 666 deletions

View File

@@ -0,0 +1 @@
pub mod states;

View File

@@ -0,0 +1,787 @@
use crate::StateId;
use crate::task_spawner::{Priority, TaskSpawner};
use crate::utils::ResponseFilter;
use crate::validator::pubkey_to_validator_index;
use crate::version::{
ResponseIncludesVersion, add_consensus_version_header,
execution_optimistic_finalized_beacon_response,
};
use beacon_chain::{BeaconChain, BeaconChainError, BeaconChainTypes, WhenSlotSkipped};
use eth2::types::{
ValidatorBalancesRequestBody, ValidatorId, ValidatorIdentitiesRequestBody,
ValidatorsRequestBody,
};
use std::sync::Arc;
use types::{
AttestationShufflingId, CommitteeCache, Error as BeaconStateError, EthSpec, RelativeEpoch,
};
use warp::filters::BoxedFilter;
use warp::{Filter, Reply};
use warp_utils::query::multi_key_query;
type BeaconStatesPath<T> = BoxedFilter<(
StateId,
TaskSpawner<<T as BeaconChainTypes>::EthSpec>,
Arc<BeaconChain<T>>,
)>;
// GET beacon/states/{state_id}/pending_consolidations
pub fn get_beacon_state_pending_consolidations<T: BeaconChainTypes>(
beacon_states_path: BeaconStatesPath<T>,
) -> ResponseFilter {
beacon_states_path
.clone()
.and(warp::path("pending_consolidations"))
.and(warp::path::end())
.then(
|state_id: StateId,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>| {
task_spawner.blocking_response_task(Priority::P1, move || {
let (data, execution_optimistic, finalized, fork_name) = state_id
.map_state_and_execution_optimistic_and_finalized(
&chain,
|state, execution_optimistic, finalized| {
let Ok(consolidations) = state.pending_consolidations() else {
return Err(warp_utils::reject::custom_bad_request(
"Pending consolidations not found".to_string(),
));
};
Ok((
consolidations.clone(),
execution_optimistic,
finalized,
state.fork_name_unchecked(),
))
},
)?;
execution_optimistic_finalized_beacon_response(
ResponseIncludesVersion::Yes(fork_name),
execution_optimistic,
finalized,
data,
)
.map(|res| warp::reply::json(&res).into_response())
.map(|resp| add_consensus_version_header(resp, fork_name))
})
},
)
.boxed()
}
// GET beacon/states/{state_id}/pending_partial_withdrawals
pub fn get_beacon_state_pending_partial_withdrawals<T: BeaconChainTypes>(
beacon_states_path: BeaconStatesPath<T>,
) -> ResponseFilter {
beacon_states_path
.clone()
.and(warp::path("pending_partial_withdrawals"))
.and(warp::path::end())
.then(
|state_id: StateId,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>| {
task_spawner.blocking_response_task(Priority::P1, move || {
let (data, execution_optimistic, finalized, fork_name) = state_id
.map_state_and_execution_optimistic_and_finalized(
&chain,
|state, execution_optimistic, finalized| {
let Ok(withdrawals) = state.pending_partial_withdrawals() else {
return Err(warp_utils::reject::custom_bad_request(
"Pending withdrawals not found".to_string(),
));
};
Ok((
withdrawals.clone(),
execution_optimistic,
finalized,
state.fork_name_unchecked(),
))
},
)?;
execution_optimistic_finalized_beacon_response(
ResponseIncludesVersion::Yes(fork_name),
execution_optimistic,
finalized,
data,
)
.map(|res| warp::reply::json(&res).into_response())
.map(|resp| add_consensus_version_header(resp, fork_name))
})
},
)
.boxed()
}
// GET beacon/states/{state_id}/pending_deposits
pub fn get_beacon_state_pending_deposits<T: BeaconChainTypes>(
beacon_states_path: BeaconStatesPath<T>,
) -> ResponseFilter {
beacon_states_path
.clone()
.and(warp::path("pending_deposits"))
.and(warp::path::end())
.then(
|state_id: StateId,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>| {
task_spawner.blocking_response_task(Priority::P1, move || {
let (data, execution_optimistic, finalized, fork_name) = state_id
.map_state_and_execution_optimistic_and_finalized(
&chain,
|state, execution_optimistic, finalized| {
let Ok(deposits) = state.pending_deposits() else {
return Err(warp_utils::reject::custom_bad_request(
"Pending deposits not found".to_string(),
));
};
Ok((
deposits.clone(),
execution_optimistic,
finalized,
state.fork_name_unchecked(),
))
},
)?;
execution_optimistic_finalized_beacon_response(
ResponseIncludesVersion::Yes(fork_name),
execution_optimistic,
finalized,
data,
)
.map(|res| warp::reply::json(&res).into_response())
.map(|resp| add_consensus_version_header(resp, fork_name))
})
},
)
.boxed()
}
// GET beacon/states/{state_id}/randao?epoch
pub fn get_beacon_state_randao<T: BeaconChainTypes>(
beacon_states_path: BeaconStatesPath<T>,
) -> ResponseFilter {
beacon_states_path
.clone()
.and(warp::path("randao"))
.and(warp::query::<eth2::types::RandaoQuery>())
.and(warp::path::end())
.then(
|state_id: StateId,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
query: eth2::types::RandaoQuery| {
task_spawner.blocking_json_task(Priority::P1, move || {
let (randao, execution_optimistic, finalized) = state_id
.map_state_and_execution_optimistic_and_finalized(
&chain,
|state, execution_optimistic, finalized| {
let epoch = query.epoch.unwrap_or_else(|| state.current_epoch());
let randao = *state.get_randao_mix(epoch).map_err(|e| {
warp_utils::reject::custom_bad_request(format!(
"epoch out of range: {e:?}"
))
})?;
Ok((randao, execution_optimistic, finalized))
},
)?;
Ok(
eth2::types::GenericResponse::from(eth2::types::RandaoMix { randao })
.add_execution_optimistic_finalized(execution_optimistic, finalized),
)
})
},
)
.boxed()
}
// GET beacon/states/{state_id}/sync_committees?epoch
pub fn get_beacon_state_sync_committees<T: BeaconChainTypes>(
beacon_states_path: BeaconStatesPath<T>,
) -> ResponseFilter {
beacon_states_path
.clone()
.and(warp::path("sync_committees"))
.and(warp::query::<eth2::types::SyncCommitteesQuery>())
.and(warp::path::end())
.then(
|state_id: StateId,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
query: eth2::types::SyncCommitteesQuery| {
task_spawner.blocking_json_task(Priority::P1, move || {
let (sync_committee, execution_optimistic, finalized) = state_id
.map_state_and_execution_optimistic_and_finalized(
&chain,
|state, execution_optimistic, finalized| {
let current_epoch = state.current_epoch();
let epoch = query.epoch.unwrap_or(current_epoch);
Ok((
state
.get_built_sync_committee(epoch, &chain.spec)
.cloned()
.map_err(|e| match e {
BeaconStateError::SyncCommitteeNotKnown { .. } => {
warp_utils::reject::custom_bad_request(format!(
"state at epoch {} has no \
sync committee for epoch {}",
current_epoch, epoch
))
}
BeaconStateError::IncorrectStateVariant => {
warp_utils::reject::custom_bad_request(format!(
"state at epoch {} is not activated for Altair",
current_epoch,
))
}
e => warp_utils::reject::beacon_state_error(e),
})?,
execution_optimistic,
finalized,
))
},
)?;
let validators = chain
.validator_indices(sync_committee.pubkeys.iter())
.map_err(warp_utils::reject::unhandled_error)?;
let validator_aggregates = validators
.chunks_exact(T::EthSpec::sync_subcommittee_size())
.map(|indices| eth2::types::SyncSubcommittee {
indices: indices.to_vec(),
})
.collect();
let response = eth2::types::SyncCommitteeByValidatorIndices {
validators,
validator_aggregates,
};
Ok(eth2::types::GenericResponse::from(response)
.add_execution_optimistic_finalized(execution_optimistic, finalized))
})
},
)
.boxed()
}
// GET beacon/states/{state_id}/committees?slot,index,epoch
pub fn get_beacon_state_committees<T: BeaconChainTypes>(
beacon_states_path: BeaconStatesPath<T>,
) -> ResponseFilter {
beacon_states_path
.clone()
.and(warp::path("committees"))
.and(warp::query::<eth2::types::CommitteesQuery>())
.and(warp::path::end())
.then(
|state_id: StateId,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
query: eth2::types::CommitteesQuery| {
task_spawner.blocking_json_task(Priority::P1, move || {
let (data, execution_optimistic, finalized) = state_id
.map_state_and_execution_optimistic_and_finalized(
&chain,
|state, execution_optimistic, finalized| {
let current_epoch = state.current_epoch();
let epoch = query.epoch.unwrap_or(current_epoch);
// Attempt to obtain the committee_cache from the beacon chain
let decision_slot = (epoch.saturating_sub(2u64))
.end_slot(T::EthSpec::slots_per_epoch());
// Find the decision block and skip to another method on any kind
// of failure
let shuffling_id = if let Ok(Some(shuffling_decision_block)) =
chain.block_root_at_slot(decision_slot, WhenSlotSkipped::Prev)
{
Some(AttestationShufflingId {
shuffling_epoch: epoch,
shuffling_decision_block,
})
} else {
None
};
// Attempt to read from the chain cache if there exists a
// shuffling_id
let maybe_cached_shuffling = if let Some(shuffling_id) =
shuffling_id.as_ref()
{
chain
.shuffling_cache
.try_write_for(std::time::Duration::from_secs(1))
.and_then(|mut cache_write| cache_write.get(shuffling_id))
.and_then(|cache_item| cache_item.wait().ok())
} else {
None
};
let committee_cache =
if let Some(shuffling) = maybe_cached_shuffling {
shuffling
} else {
let possibly_built_cache =
match RelativeEpoch::from_epoch(current_epoch, epoch) {
Ok(relative_epoch)
if state.committee_cache_is_initialized(
relative_epoch,
) =>
{
state.committee_cache(relative_epoch).cloned()
}
_ => CommitteeCache::initialized(
state,
epoch,
&chain.spec,
),
}
.map_err(
|e| match e {
BeaconStateError::EpochOutOfBounds => {
let max_sprp =
T::EthSpec::slots_per_historical_root()
as u64;
let first_subsequent_restore_point_slot =
((epoch.start_slot(
T::EthSpec::slots_per_epoch(),
) / max_sprp)
+ 1)
* max_sprp;
if epoch < current_epoch {
warp_utils::reject::custom_bad_request(
format!(
"epoch out of bounds, \
try state at slot {}",
first_subsequent_restore_point_slot,
),
)
} else {
warp_utils::reject::custom_bad_request(
"epoch out of bounds, \
too far in future"
.into(),
)
}
}
_ => warp_utils::reject::unhandled_error(
BeaconChainError::from(e),
),
},
)?;
// Attempt to write to the beacon cache (only if the cache
// size is not the default value).
if chain.config.shuffling_cache_size
!= beacon_chain::shuffling_cache::DEFAULT_CACHE_SIZE
&& let Some(shuffling_id) = shuffling_id
&& let Some(mut cache_write) = chain
.shuffling_cache
.try_write_for(std::time::Duration::from_secs(1))
{
cache_write.insert_committee_cache(
shuffling_id,
&possibly_built_cache,
);
}
possibly_built_cache
};
// Use either the supplied slot or all slots in the epoch.
let slots =
query.slot.map(|slot| vec![slot]).unwrap_or_else(|| {
epoch.slot_iter(T::EthSpec::slots_per_epoch()).collect()
});
// Use either the supplied committee index or all available indices.
let indices =
query.index.map(|index| vec![index]).unwrap_or_else(|| {
(0..committee_cache.committees_per_slot()).collect()
});
let mut response = Vec::with_capacity(slots.len() * indices.len());
for slot in slots {
// It is not acceptable to query with a slot that is not within the
// specified epoch.
if slot.epoch(T::EthSpec::slots_per_epoch()) != epoch {
return Err(warp_utils::reject::custom_bad_request(
format!("{} is not in epoch {}", slot, epoch),
));
}
for &index in &indices {
let committee = committee_cache
.get_beacon_committee(slot, index)
.ok_or_else(|| {
warp_utils::reject::custom_bad_request(format!(
"committee index {} does not exist in epoch {}",
index, epoch
))
})?;
response.push(eth2::types::CommitteeData {
index,
slot,
validators: committee
.committee
.iter()
.map(|i| *i as u64)
.collect(),
});
}
}
Ok((response, execution_optimistic, finalized))
},
)?;
Ok(eth2::types::ExecutionOptimisticFinalizedResponse {
data,
execution_optimistic: Some(execution_optimistic),
finalized: Some(finalized),
})
})
},
)
.boxed()
}
// GET beacon/states/{state_id}/validators/{validator_id}
pub fn get_beacon_state_validators_id<T: BeaconChainTypes>(
beacon_states_path: BeaconStatesPath<T>,
) -> ResponseFilter {
beacon_states_path
.clone()
.and(warp::path("validators"))
.and(warp::path::param::<ValidatorId>().or_else(|_| async {
Err(warp_utils::reject::custom_bad_request(
"Invalid validator ID".to_string(),
))
}))
.and(warp::path::end())
.then(
|state_id: StateId,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
validator_id: ValidatorId| {
// Prioritise requests for validators at the head. These should be fast to service
// and could be required by the validator client.
let priority = if let StateId(eth2::types::StateId::Head) = state_id {
Priority::P0
} else {
Priority::P1
};
task_spawner.blocking_json_task(priority, move || {
let (data, execution_optimistic, finalized) = state_id
.map_state_and_execution_optimistic_and_finalized(
&chain,
|state, execution_optimistic, finalized| {
let index_opt = match &validator_id {
ValidatorId::PublicKey(pubkey) => pubkey_to_validator_index(
&chain, state, pubkey,
)
.map_err(|e| {
warp_utils::reject::custom_not_found(format!(
"unable to access pubkey cache: {e:?}",
))
})?,
ValidatorId::Index(index) => Some(*index as usize),
};
Ok((
index_opt
.and_then(|index| {
let validator = state.validators().get(index)?;
let balance = *state.balances().get(index)?;
let epoch = state.current_epoch();
let far_future_epoch = chain.spec.far_future_epoch;
Some(eth2::types::ValidatorData {
index: index as u64,
balance,
status:
eth2::types::ValidatorStatus::from_validator(
validator,
epoch,
far_future_epoch,
),
validator: validator.clone(),
})
})
.ok_or_else(|| {
warp_utils::reject::custom_not_found(format!(
"unknown validator: {}",
validator_id
))
})?,
execution_optimistic,
finalized,
))
},
)?;
Ok(eth2::types::ExecutionOptimisticFinalizedResponse {
data,
execution_optimistic: Some(execution_optimistic),
finalized: Some(finalized),
})
})
},
)
.boxed()
}
// POST beacon/states/{state_id}/validators
pub fn post_beacon_state_validators<T: BeaconChainTypes>(
beacon_states_path: BeaconStatesPath<T>,
) -> ResponseFilter {
beacon_states_path
.clone()
.and(warp::path("validators"))
.and(warp::path::end())
.and(warp_utils::json::json())
.then(
|state_id: StateId,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
query: ValidatorsRequestBody| {
// Prioritise requests for validators at the head. These should be fast to service
// and could be required by the validator client.
let priority = if let StateId(eth2::types::StateId::Head) = state_id {
Priority::P0
} else {
Priority::P1
};
task_spawner.blocking_json_task(priority, move || {
crate::validators::get_beacon_state_validators(
state_id,
chain,
&query.ids,
&query.statuses,
)
})
},
)
.boxed()
}
// GET beacon/states/{state_id}/validators?id,status
pub fn get_beacon_state_validators<T: BeaconChainTypes>(
beacon_states_path: BeaconStatesPath<T>,
) -> ResponseFilter {
beacon_states_path
.clone()
.and(warp::path("validators"))
.and(warp::path::end())
.and(multi_key_query::<eth2::types::ValidatorsQuery>())
.then(
|state_id: StateId,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
query_res: Result<eth2::types::ValidatorsQuery, warp::Rejection>| {
// Prioritise requests for validators at the head. These should be fast to service
// and could be required by the validator client.
let priority = if let StateId(eth2::types::StateId::Head) = state_id {
Priority::P0
} else {
Priority::P1
};
task_spawner.blocking_json_task(priority, move || {
let query = query_res?;
crate::validators::get_beacon_state_validators(
state_id,
chain,
&query.id,
&query.status,
)
})
},
)
.boxed()
}
// POST beacon/states/{state_id}/validator_identities
pub fn post_beacon_state_validator_identities<T: BeaconChainTypes>(
beacon_states_path: BeaconStatesPath<T>,
) -> ResponseFilter {
beacon_states_path
.clone()
.and(warp::path("validator_identities"))
.and(warp::path::end())
.and(warp_utils::json::json_no_body())
.then(
|state_id: StateId,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
query: ValidatorIdentitiesRequestBody| {
// Prioritise requests for validators at the head. These should be fast to service
// and could be required by the validator client.
let priority = if let StateId(eth2::types::StateId::Head) = state_id {
Priority::P0
} else {
Priority::P1
};
task_spawner.blocking_json_task(priority, move || {
crate::validators::get_beacon_state_validator_identities(
state_id,
chain,
Some(&query.ids),
)
})
},
)
.boxed()
}
// POST beacon/states/{state_id}/validator_balances
pub fn post_beacon_state_validator_balances<T: BeaconChainTypes>(
beacon_states_path: BeaconStatesPath<T>,
) -> ResponseFilter {
beacon_states_path
.clone()
.and(warp::path("validator_balances"))
.and(warp::path::end())
.and(warp_utils::json::json_no_body())
.then(
|state_id: StateId,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
query: ValidatorBalancesRequestBody| {
task_spawner.blocking_json_task(Priority::P1, move || {
crate::validators::get_beacon_state_validator_balances(
state_id,
chain,
Some(&query.ids),
)
})
},
)
.boxed()
}
// GET beacon/states/{state_id}/validator_balances?id
pub fn get_beacon_state_validator_balances<T: BeaconChainTypes>(
beacon_states_path: BeaconStatesPath<T>,
) -> ResponseFilter {
beacon_states_path
.clone()
.and(warp::path("validator_balances"))
.and(warp::path::end())
.and(multi_key_query::<eth2::types::ValidatorBalancesQuery>())
.then(
|state_id: StateId,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
query_res: Result<eth2::types::ValidatorBalancesQuery, warp::Rejection>| {
task_spawner.blocking_json_task(Priority::P1, move || {
let query = query_res?;
crate::validators::get_beacon_state_validator_balances(
state_id,
chain,
query.id.as_deref(),
)
})
},
)
.boxed()
}
// GET beacon/states/{state_id}/finality_checkpoints
pub fn get_beacon_state_finality_checkpoints<T: BeaconChainTypes>(
beacon_states_path: BeaconStatesPath<T>,
) -> ResponseFilter {
beacon_states_path
.clone()
.and(warp::path("finality_checkpoints"))
.and(warp::path::end())
.then(
|state_id: StateId,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>| {
task_spawner.blocking_json_task(Priority::P1, move || {
let (data, execution_optimistic, finalized) = state_id
.map_state_and_execution_optimistic_and_finalized(
&chain,
|state, execution_optimistic, finalized| {
Ok((
eth2::types::FinalityCheckpointsData {
previous_justified: state.previous_justified_checkpoint(),
current_justified: state.current_justified_checkpoint(),
finalized: state.finalized_checkpoint(),
},
execution_optimistic,
finalized,
))
},
)?;
Ok(eth2::types::ExecutionOptimisticFinalizedResponse {
data,
execution_optimistic: Some(execution_optimistic),
finalized: Some(finalized),
})
})
},
)
.boxed()
}
// GET beacon/states/{state_id}/fork
pub fn get_beacon_state_fork<T: BeaconChainTypes>(
beacon_states_path: BeaconStatesPath<T>,
) -> ResponseFilter {
beacon_states_path
.clone()
.and(warp::path("fork"))
.and(warp::path::end())
.then(
|state_id: StateId,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>| {
task_spawner.blocking_json_task(Priority::P1, move || {
let (fork, execution_optimistic, finalized) =
state_id.fork_and_execution_optimistic_and_finalized(&chain)?;
Ok(eth2::types::ExecutionOptimisticFinalizedResponse {
data: fork,
execution_optimistic: Some(execution_optimistic),
finalized: Some(finalized),
})
})
},
)
.boxed()
}
// GET beacon/states/{state_id}/root
pub fn get_beacon_state_root<T: BeaconChainTypes>(
beacon_states_path: BeaconStatesPath<T>,
) -> ResponseFilter {
beacon_states_path
.and(warp::path("root"))
.and(warp::path::end())
.then(
|state_id: StateId,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>| {
task_spawner.blocking_json_task(Priority::P1, move || {
let (root, execution_optimistic, finalized) = state_id.root(&chain)?;
Ok(eth2::types::GenericResponse::from(
eth2::types::RootData::from(root),
))
.map(|resp| {
resp.add_execution_optimistic_finalized(execution_optimistic, finalized)
})
})
},
)
.boxed()
}

View File

@@ -8,6 +8,7 @@
mod aggregate_attestation;
mod attestation_performance;
mod attester_duties;
mod beacon;
mod block_id;
mod block_packing_efficiency;
mod block_rewards;
@@ -29,13 +30,16 @@ mod sync_committees;
mod task_spawner;
pub mod test_utils;
mod ui;
mod utils;
mod validator;
mod validator_inclusion;
mod validators;
mod version;
use crate::light_client::{get_light_client_bootstrap, get_light_client_updates};
use crate::produce_block::{produce_blinded_block_v2, produce_block_v2, produce_block_v3};
use crate::version::beacon_response;
use beacon::states;
use beacon_chain::{
AttestationError as AttnError, BeaconChain, BeaconChainError, BeaconChainTypes,
WhenSlotSkipped, attestation_verification::VerifiedAttestation,
@@ -50,8 +54,7 @@ use eth2::StatusCode;
use eth2::types::{
self as api_types, BroadcastValidation, ContextDeserialize, EndpointVersion, ForkChoice,
ForkChoiceExtraData, ForkChoiceNode, LightClientUpdatesQuery, PublishBlockRequest,
StateId as CoreStateId, ValidatorBalancesRequestBody, ValidatorId,
ValidatorIdentitiesRequestBody, ValidatorStatus, ValidatorsRequestBody,
StateId as CoreStateId, ValidatorId, ValidatorStatus,
};
use eth2::{CONSENSUS_VERSION_HEADER, CONTENT_TYPE_HEADER, SSZ_CONTENT_TYPE_HEADER};
use health_metrics::observe::Observe;
@@ -90,14 +93,12 @@ use tokio_stream::{
};
use tracing::{debug, error, info, warn};
use types::{
Attestation, AttestationData, AttestationShufflingId, AttesterSlashing, BeaconStateError,
ChainSpec, Checkpoint, CommitteeCache, ConfigAndPreset, Epoch, EthSpec, ForkName, Hash256,
ProposerPreparationData, ProposerSlashing, RelativeEpoch, SignedAggregateAndProof,
SignedBlindedBeaconBlock, SignedBlsToExecutionChange, SignedContributionAndProof,
SignedValidatorRegistrationData, SignedVoluntaryExit, SingleAttestation, Slot,
SyncCommitteeMessage, SyncContributionData,
Attestation, AttestationData, AttesterSlashing, BeaconStateError, ChainSpec, Checkpoint,
ConfigAndPreset, Epoch, EthSpec, ForkName, Hash256, ProposerPreparationData, ProposerSlashing,
SignedAggregateAndProof, SignedBlindedBeaconBlock, SignedBlsToExecutionChange,
SignedContributionAndProof, SignedValidatorRegistrationData, SignedVoluntaryExit,
SingleAttestation, Slot, SyncCommitteeMessage, SyncContributionData,
};
use validator::pubkey_to_validator_index;
use version::{
ResponseIncludesVersion, V1, V2, V3, add_consensus_version_header, add_ssz_content_type_header,
execution_optimistic_finalized_beacon_response, inconsistent_fork_rejection,
@@ -583,693 +584,65 @@ pub fn serve<T: BeaconChainTypes>(
))
}))
.and(task_spawner_filter.clone())
.and(chain_filter.clone());
.and(chain_filter.clone())
.boxed();
// GET beacon/states/{state_id}/root
let get_beacon_state_root = beacon_states_path
.clone()
.and(warp::path("root"))
.and(warp::path::end())
.then(
|state_id: StateId,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>| {
task_spawner.blocking_json_task(Priority::P1, move || {
let (root, execution_optimistic, finalized) = state_id.root(&chain)?;
Ok(api_types::GenericResponse::from(api_types::RootData::from(
root,
)))
.map(|resp| {
resp.add_execution_optimistic_finalized(execution_optimistic, finalized)
})
})
},
);
let get_beacon_state_root = states::get_beacon_state_root(beacon_states_path.clone());
// GET beacon/states/{state_id}/fork
let get_beacon_state_fork = beacon_states_path
.clone()
.and(warp::path("fork"))
.and(warp::path::end())
.then(
|state_id: StateId,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>| {
task_spawner.blocking_json_task(Priority::P1, move || {
let (fork, execution_optimistic, finalized) =
state_id.fork_and_execution_optimistic_and_finalized(&chain)?;
Ok(api_types::ExecutionOptimisticFinalizedResponse {
data: fork,
execution_optimistic: Some(execution_optimistic),
finalized: Some(finalized),
})
})
},
);
let get_beacon_state_fork = states::get_beacon_state_fork(beacon_states_path.clone());
// GET beacon/states/{state_id}/finality_checkpoints
let get_beacon_state_finality_checkpoints = beacon_states_path
.clone()
.and(warp::path("finality_checkpoints"))
.and(warp::path::end())
.then(
|state_id: StateId,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>| {
task_spawner.blocking_json_task(Priority::P1, move || {
let (data, execution_optimistic, finalized) = state_id
.map_state_and_execution_optimistic_and_finalized(
&chain,
|state, execution_optimistic, finalized| {
Ok((
api_types::FinalityCheckpointsData {
previous_justified: state.previous_justified_checkpoint(),
current_justified: state.current_justified_checkpoint(),
finalized: state.finalized_checkpoint(),
},
execution_optimistic,
finalized,
))
},
)?;
Ok(api_types::ExecutionOptimisticFinalizedResponse {
data,
execution_optimistic: Some(execution_optimistic),
finalized: Some(finalized),
})
})
},
);
let get_beacon_state_finality_checkpoints =
states::get_beacon_state_finality_checkpoints(beacon_states_path.clone());
// GET beacon/states/{state_id}/validator_balances?id
let get_beacon_state_validator_balances = beacon_states_path
.clone()
.and(warp::path("validator_balances"))
.and(warp::path::end())
.and(multi_key_query::<api_types::ValidatorBalancesQuery>())
.then(
|state_id: StateId,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
query_res: Result<api_types::ValidatorBalancesQuery, warp::Rejection>| {
task_spawner.blocking_json_task(Priority::P1, move || {
let query = query_res?;
crate::validators::get_beacon_state_validator_balances(
state_id,
chain,
query.id.as_deref(),
)
})
},
);
let get_beacon_state_validator_balances =
states::get_beacon_state_validator_balances(beacon_states_path.clone());
// POST beacon/states/{state_id}/validator_balances
let post_beacon_state_validator_balances = beacon_states_path
.clone()
.and(warp::path("validator_balances"))
.and(warp::path::end())
.and(warp_utils::json::json_no_body())
.then(
|state_id: StateId,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
query: ValidatorBalancesRequestBody| {
task_spawner.blocking_json_task(Priority::P1, move || {
crate::validators::get_beacon_state_validator_balances(
state_id,
chain,
Some(&query.ids),
)
})
},
);
let post_beacon_state_validator_balances =
states::post_beacon_state_validator_balances(beacon_states_path.clone());
// POST beacon/states/{state_id}/validator_identities
let post_beacon_state_validator_identities = beacon_states_path
.clone()
.and(warp::path("validator_identities"))
.and(warp::path::end())
.and(warp_utils::json::json_no_body())
.then(
|state_id: StateId,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
query: ValidatorIdentitiesRequestBody| {
// Prioritise requests for validators at the head. These should be fast to service
// and could be required by the validator client.
let priority = if let StateId(eth2::types::StateId::Head) = state_id {
Priority::P0
} else {
Priority::P1
};
task_spawner.blocking_json_task(priority, move || {
crate::validators::get_beacon_state_validator_identities(
state_id,
chain,
Some(&query.ids),
)
})
},
);
let post_beacon_state_validator_identities =
states::post_beacon_state_validator_identities(beacon_states_path.clone());
// GET beacon/states/{state_id}/validators?id,status
let get_beacon_state_validators = beacon_states_path
.clone()
.and(warp::path("validators"))
.and(warp::path::end())
.and(multi_key_query::<api_types::ValidatorsQuery>())
.then(
|state_id: StateId,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
query_res: Result<api_types::ValidatorsQuery, warp::Rejection>| {
// Prioritise requests for validators at the head. These should be fast to service
// and could be required by the validator client.
let priority = if let StateId(eth2::types::StateId::Head) = state_id {
Priority::P0
} else {
Priority::P1
};
task_spawner.blocking_json_task(priority, move || {
let query = query_res?;
crate::validators::get_beacon_state_validators(
state_id,
chain,
&query.id,
&query.status,
)
})
},
);
let get_beacon_state_validators =
states::get_beacon_state_validators(beacon_states_path.clone());
// POST beacon/states/{state_id}/validators
let post_beacon_state_validators = beacon_states_path
.clone()
.and(warp::path("validators"))
.and(warp::path::end())
.and(warp_utils::json::json())
.then(
|state_id: StateId,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
query: ValidatorsRequestBody| {
// Prioritise requests for validators at the head. These should be fast to service
// and could be required by the validator client.
let priority = if let StateId(eth2::types::StateId::Head) = state_id {
Priority::P0
} else {
Priority::P1
};
task_spawner.blocking_json_task(priority, move || {
crate::validators::get_beacon_state_validators(
state_id,
chain,
&query.ids,
&query.statuses,
)
})
},
);
let post_beacon_state_validators =
states::post_beacon_state_validators(beacon_states_path.clone());
// GET beacon/states/{state_id}/validators/{validator_id}
let get_beacon_state_validators_id = beacon_states_path
.clone()
.and(warp::path("validators"))
.and(warp::path::param::<ValidatorId>().or_else(|_| async {
Err(warp_utils::reject::custom_bad_request(
"Invalid validator ID".to_string(),
))
}))
.and(warp::path::end())
.then(
|state_id: StateId,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
validator_id: ValidatorId| {
// Prioritise requests for validators at the head. These should be fast to service
// and could be required by the validator client.
let priority = if let StateId(eth2::types::StateId::Head) = state_id {
Priority::P0
} else {
Priority::P1
};
task_spawner.blocking_json_task(priority, move || {
let (data, execution_optimistic, finalized) = state_id
.map_state_and_execution_optimistic_and_finalized(
&chain,
|state, execution_optimistic, finalized| {
let index_opt = match &validator_id {
ValidatorId::PublicKey(pubkey) => pubkey_to_validator_index(
&chain, state, pubkey,
)
.map_err(|e| {
warp_utils::reject::custom_not_found(format!(
"unable to access pubkey cache: {e:?}",
))
})?,
ValidatorId::Index(index) => Some(*index as usize),
};
Ok((
index_opt
.and_then(|index| {
let validator = state.validators().get(index)?;
let balance = *state.balances().get(index)?;
let epoch = state.current_epoch();
let far_future_epoch = chain.spec.far_future_epoch;
Some(api_types::ValidatorData {
index: index as u64,
balance,
status: api_types::ValidatorStatus::from_validator(
validator,
epoch,
far_future_epoch,
),
validator: validator.clone(),
})
})
.ok_or_else(|| {
warp_utils::reject::custom_not_found(format!(
"unknown validator: {}",
validator_id
))
})?,
execution_optimistic,
finalized,
))
},
)?;
Ok(api_types::ExecutionOptimisticFinalizedResponse {
data,
execution_optimistic: Some(execution_optimistic),
finalized: Some(finalized),
})
})
},
);
let get_beacon_state_validators_id =
states::get_beacon_state_validators_id(beacon_states_path.clone());
// GET beacon/states/{state_id}/committees?slot,index,epoch
let get_beacon_state_committees = beacon_states_path
.clone()
.and(warp::path("committees"))
.and(warp::query::<api_types::CommitteesQuery>())
.and(warp::path::end())
.then(
|state_id: StateId,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
query: api_types::CommitteesQuery| {
task_spawner.blocking_json_task(Priority::P1, move || {
let (data, execution_optimistic, finalized) = state_id
.map_state_and_execution_optimistic_and_finalized(
&chain,
|state, execution_optimistic, finalized| {
let current_epoch = state.current_epoch();
let epoch = query.epoch.unwrap_or(current_epoch);
// Attempt to obtain the committee_cache from the beacon chain
let decision_slot = (epoch.saturating_sub(2u64))
.end_slot(T::EthSpec::slots_per_epoch());
// Find the decision block and skip to another method on any kind
// of failure
let shuffling_id = if let Ok(Some(shuffling_decision_block)) =
chain.block_root_at_slot(decision_slot, WhenSlotSkipped::Prev)
{
Some(AttestationShufflingId {
shuffling_epoch: epoch,
shuffling_decision_block,
})
} else {
None
};
// Attempt to read from the chain cache if there exists a
// shuffling_id
let maybe_cached_shuffling = if let Some(shuffling_id) =
shuffling_id.as_ref()
{
chain
.shuffling_cache
.try_write_for(std::time::Duration::from_secs(1))
.and_then(|mut cache_write| cache_write.get(shuffling_id))
.and_then(|cache_item| cache_item.wait().ok())
} else {
None
};
let committee_cache =
if let Some(shuffling) = maybe_cached_shuffling {
shuffling
} else {
let possibly_built_cache =
match RelativeEpoch::from_epoch(current_epoch, epoch) {
Ok(relative_epoch)
if state.committee_cache_is_initialized(
relative_epoch,
) =>
{
state.committee_cache(relative_epoch).cloned()
}
_ => CommitteeCache::initialized(
state,
epoch,
&chain.spec,
),
}
.map_err(
|e| match e {
BeaconStateError::EpochOutOfBounds => {
let max_sprp =
T::EthSpec::slots_per_historical_root()
as u64;
let first_subsequent_restore_point_slot =
((epoch.start_slot(
T::EthSpec::slots_per_epoch(),
) / max_sprp)
+ 1)
* max_sprp;
if epoch < current_epoch {
warp_utils::reject::custom_bad_request(
format!(
"epoch out of bounds, \
try state at slot {}",
first_subsequent_restore_point_slot,
),
)
} else {
warp_utils::reject::custom_bad_request(
"epoch out of bounds, \
too far in future"
.into(),
)
}
}
_ => warp_utils::reject::unhandled_error(
BeaconChainError::from(e),
),
},
)?;
// Attempt to write to the beacon cache (only if the cache
// size is not the default value).
if chain.config.shuffling_cache_size
!= beacon_chain::shuffling_cache::DEFAULT_CACHE_SIZE
&& let Some(shuffling_id) = shuffling_id
&& let Some(mut cache_write) = chain
.shuffling_cache
.try_write_for(std::time::Duration::from_secs(1))
{
cache_write.insert_committee_cache(
shuffling_id,
&possibly_built_cache,
);
}
possibly_built_cache
};
// Use either the supplied slot or all slots in the epoch.
let slots =
query.slot.map(|slot| vec![slot]).unwrap_or_else(|| {
epoch.slot_iter(T::EthSpec::slots_per_epoch()).collect()
});
// Use either the supplied committee index or all available indices.
let indices =
query.index.map(|index| vec![index]).unwrap_or_else(|| {
(0..committee_cache.committees_per_slot()).collect()
});
let mut response = Vec::with_capacity(slots.len() * indices.len());
for slot in slots {
// It is not acceptable to query with a slot that is not within the
// specified epoch.
if slot.epoch(T::EthSpec::slots_per_epoch()) != epoch {
return Err(warp_utils::reject::custom_bad_request(
format!("{} is not in epoch {}", slot, epoch),
));
}
for &index in &indices {
let committee = committee_cache
.get_beacon_committee(slot, index)
.ok_or_else(|| {
warp_utils::reject::custom_bad_request(format!(
"committee index {} does not exist in epoch {}",
index, epoch
))
})?;
response.push(api_types::CommitteeData {
index,
slot,
validators: committee
.committee
.iter()
.map(|i| *i as u64)
.collect(),
});
}
}
Ok((response, execution_optimistic, finalized))
},
)?;
Ok(api_types::ExecutionOptimisticFinalizedResponse {
data,
execution_optimistic: Some(execution_optimistic),
finalized: Some(finalized),
})
})
},
);
let get_beacon_state_committees =
states::get_beacon_state_committees(beacon_states_path.clone());
// GET beacon/states/{state_id}/sync_committees?epoch
let get_beacon_state_sync_committees = beacon_states_path
.clone()
.and(warp::path("sync_committees"))
.and(warp::query::<api_types::SyncCommitteesQuery>())
.and(warp::path::end())
.then(
|state_id: StateId,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
query: api_types::SyncCommitteesQuery| {
task_spawner.blocking_json_task(Priority::P1, move || {
let (sync_committee, execution_optimistic, finalized) = state_id
.map_state_and_execution_optimistic_and_finalized(
&chain,
|state, execution_optimistic, finalized| {
let current_epoch = state.current_epoch();
let epoch = query.epoch.unwrap_or(current_epoch);
Ok((
state
.get_built_sync_committee(epoch, &chain.spec)
.cloned()
.map_err(|e| match e {
BeaconStateError::SyncCommitteeNotKnown { .. } => {
warp_utils::reject::custom_bad_request(format!(
"state at epoch {} has no \
sync committee for epoch {}",
current_epoch, epoch
))
}
BeaconStateError::IncorrectStateVariant => {
warp_utils::reject::custom_bad_request(format!(
"state at epoch {} is not activated for Altair",
current_epoch,
))
}
e => warp_utils::reject::beacon_state_error(e),
})?,
execution_optimistic,
finalized,
))
},
)?;
let validators = chain
.validator_indices(sync_committee.pubkeys.iter())
.map_err(warp_utils::reject::unhandled_error)?;
let validator_aggregates = validators
.chunks_exact(T::EthSpec::sync_subcommittee_size())
.map(|indices| api_types::SyncSubcommittee {
indices: indices.to_vec(),
})
.collect();
let response = api_types::SyncCommitteeByValidatorIndices {
validators,
validator_aggregates,
};
Ok(api_types::GenericResponse::from(response)
.add_execution_optimistic_finalized(execution_optimistic, finalized))
})
},
);
let get_beacon_state_sync_committees =
states::get_beacon_state_sync_committees(beacon_states_path.clone());
// GET beacon/states/{state_id}/randao?epoch
let get_beacon_state_randao = beacon_states_path
.clone()
.and(warp::path("randao"))
.and(warp::query::<api_types::RandaoQuery>())
.and(warp::path::end())
.then(
|state_id: StateId,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
query: api_types::RandaoQuery| {
task_spawner.blocking_json_task(Priority::P1, move || {
let (randao, execution_optimistic, finalized) = state_id
.map_state_and_execution_optimistic_and_finalized(
&chain,
|state, execution_optimistic, finalized| {
let epoch = query.epoch.unwrap_or_else(|| state.current_epoch());
let randao = *state.get_randao_mix(epoch).map_err(|e| {
warp_utils::reject::custom_bad_request(format!(
"epoch out of range: {e:?}"
))
})?;
Ok((randao, execution_optimistic, finalized))
},
)?;
Ok(
api_types::GenericResponse::from(api_types::RandaoMix { randao })
.add_execution_optimistic_finalized(execution_optimistic, finalized),
)
})
},
);
let get_beacon_state_randao = states::get_beacon_state_randao(beacon_states_path.clone());
// GET beacon/states/{state_id}/pending_deposits
let get_beacon_state_pending_deposits = beacon_states_path
.clone()
.and(warp::path("pending_deposits"))
.and(warp::path::end())
.then(
|state_id: StateId,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>| {
task_spawner.blocking_response_task(Priority::P1, move || {
let (data, execution_optimistic, finalized, fork_name) = state_id
.map_state_and_execution_optimistic_and_finalized(
&chain,
|state, execution_optimistic, finalized| {
let Ok(deposits) = state.pending_deposits() else {
return Err(warp_utils::reject::custom_bad_request(
"Pending deposits not found".to_string(),
));
};
Ok((
deposits.clone(),
execution_optimistic,
finalized,
state.fork_name_unchecked(),
))
},
)?;
execution_optimistic_finalized_beacon_response(
ResponseIncludesVersion::Yes(fork_name),
execution_optimistic,
finalized,
data,
)
.map(|res| warp::reply::json(&res).into_response())
.map(|resp| add_consensus_version_header(resp, fork_name))
})
},
);
let get_beacon_state_pending_deposits =
states::get_beacon_state_pending_deposits(beacon_states_path.clone());
// GET beacon/states/{state_id}/pending_partial_withdrawals
let get_beacon_state_pending_partial_withdrawals = beacon_states_path
.clone()
.and(warp::path("pending_partial_withdrawals"))
.and(warp::path::end())
.then(
|state_id: StateId,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>| {
task_spawner.blocking_response_task(Priority::P1, move || {
let (data, execution_optimistic, finalized, fork_name) = state_id
.map_state_and_execution_optimistic_and_finalized(
&chain,
|state, execution_optimistic, finalized| {
let Ok(withdrawals) = state.pending_partial_withdrawals() else {
return Err(warp_utils::reject::custom_bad_request(
"Pending withdrawals not found".to_string(),
));
};
Ok((
withdrawals.clone(),
execution_optimistic,
finalized,
state.fork_name_unchecked(),
))
},
)?;
execution_optimistic_finalized_beacon_response(
ResponseIncludesVersion::Yes(fork_name),
execution_optimistic,
finalized,
data,
)
.map(|res| warp::reply::json(&res).into_response())
.map(|resp| add_consensus_version_header(resp, fork_name))
})
},
);
let get_beacon_state_pending_partial_withdrawals =
states::get_beacon_state_pending_partial_withdrawals(beacon_states_path.clone());
// GET beacon/states/{state_id}/pending_consolidations
let get_beacon_state_pending_consolidations = beacon_states_path
.clone()
.and(warp::path("pending_consolidations"))
.and(warp::path::end())
.then(
|state_id: StateId,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>| {
task_spawner.blocking_response_task(Priority::P1, move || {
let (data, execution_optimistic, finalized, fork_name) = state_id
.map_state_and_execution_optimistic_and_finalized(
&chain,
|state, execution_optimistic, finalized| {
let Ok(consolidations) = state.pending_consolidations() else {
return Err(warp_utils::reject::custom_bad_request(
"Pending consolidations not found".to_string(),
));
};
Ok((
consolidations.clone(),
execution_optimistic,
finalized,
state.fork_name_unchecked(),
))
},
)?;
execution_optimistic_finalized_beacon_response(
ResponseIncludesVersion::Yes(fork_name),
execution_optimistic,
finalized,
data,
)
.map(|res| warp::reply::json(&res).into_response())
.map(|resp| add_consensus_version_header(resp, fork_name))
})
},
);
let get_beacon_state_pending_consolidations =
states::get_beacon_state_pending_consolidations(beacon_states_path.clone());
// GET beacon/headers
//

View File

@@ -0,0 +1,3 @@
use warp::filters::BoxedFilter;
pub type ResponseFilter = BoxedFilter<(warp::reply::Response,)>;