ContextDeserialize and Beacon API Improvements (#7372)

* #7286
* BeaconAPI is not returning a versioned response when it should for some V1 endpoints
* these [strange functions with vX in the name that still accept `endpoint_version` arguments](https://github.com/sigp/lighthouse/blob/stable/beacon_node/http_api/src/produce_block.rs#L192)

This refactor is a prerequisite to get the fulu EF tests running.
This commit is contained in:
ethDreamer
2025-05-19 00:05:16 -05:00
committed by GitHub
parent fcfcbf9a11
commit 7684d1f866
92 changed files with 1863 additions and 827 deletions

View File

@@ -34,7 +34,7 @@ mod validators;
mod version;
use crate::light_client::{get_light_client_bootstrap, get_light_client_updates};
use crate::produce_block::{produce_blinded_block_v2, produce_block_v2, produce_block_v3};
use crate::version::fork_versioned_response;
use crate::version::beacon_response;
use beacon_chain::{
attestation_verification::VerifiedAttestation, observed_operations::ObservationOutcome,
validator_monitor::timestamp_now, AttestationError as AttnError, BeaconChain, BeaconChainError,
@@ -47,9 +47,9 @@ use bytes::Bytes;
use directory::DEFAULT_ROOT_DIR;
use either::Either;
use eth2::types::{
self as api_types, BroadcastValidation, EndpointVersion, ForkChoice, ForkChoiceNode,
LightClientUpdatesQuery, PublishBlockRequest, ValidatorBalancesRequestBody, ValidatorId,
ValidatorStatus, ValidatorsRequestBody,
self as api_types, BroadcastValidation, ContextDeserialize, EndpointVersion, ForkChoice,
ForkChoiceNode, LightClientUpdatesQuery, PublishBlockRequest, ValidatorBalancesRequestBody,
ValidatorId, ValidatorStatus, ValidatorsRequestBody,
};
use eth2::{CONSENSUS_VERSION_HEADER, CONTENT_TYPE_HEADER, SSZ_CONTENT_TYPE_HEADER};
use health_metrics::observe::Observe;
@@ -89,18 +89,17 @@ use tokio_stream::{
use tracing::{debug, error, info, warn};
use types::AttestationData;
use types::{
fork_versioned_response::EmptyMetadata, Attestation, AttestationShufflingId, AttesterSlashing,
BeaconStateError, ChainSpec, Checkpoint, CommitteeCache, ConfigAndPreset, Epoch, EthSpec,
ForkName, ForkVersionedResponse, Hash256, ProposerPreparationData, ProposerSlashing,
RelativeEpoch, SignedAggregateAndProof, SignedBlindedBeaconBlock, SignedBlsToExecutionChange,
SignedContributionAndProof, SignedValidatorRegistrationData, SignedVoluntaryExit, Slot,
SyncCommitteeMessage, SyncContributionData,
Attestation, AttestationShufflingId, AttesterSlashing, BeaconStateError, ChainSpec, Checkpoint,
CommitteeCache, ConfigAndPreset, Epoch, EthSpec, ForkName, Hash256, ProposerPreparationData,
ProposerSlashing, RelativeEpoch, SignedAggregateAndProof, SignedBlindedBeaconBlock,
SignedBlsToExecutionChange, SignedContributionAndProof, SignedValidatorRegistrationData,
SignedVoluntaryExit, Slot, SyncCommitteeMessage, SyncContributionData,
};
use validator::pubkey_to_validator_index;
use version::{
add_consensus_version_header, add_ssz_content_type_header,
execution_optimistic_finalized_fork_versioned_response, inconsistent_fork_rejection,
unsupported_version_rejection, V1, V2, V3,
execution_optimistic_finalized_beacon_response, inconsistent_fork_rejection,
unsupported_version_rejection, ResponseIncludesVersion, V1, V2, V3,
};
use warp::http::StatusCode;
use warp::hyper::Body;
@@ -1153,8 +1152,8 @@ pub fn serve<T: BeaconChainTypes>(
|state_id: StateId,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>| {
task_spawner.blocking_json_task(Priority::P1, move || {
let (data, execution_optimistic, finalized) = state_id
task_spawner.blocking_response_task(Priority::P1, move || {
let (data, execution_optimistic, finalized, fork_name) = state_id
.map_state_and_execution_optimistic_and_finalized(
&chain,
|state, execution_optimistic, finalized| {
@@ -1164,15 +1163,23 @@ pub fn serve<T: BeaconChainTypes>(
));
};
Ok((deposits.clone(), execution_optimistic, finalized))
Ok((
deposits.clone(),
execution_optimistic,
finalized,
state.fork_name_unchecked(),
))
},
)?;
Ok(api_types::ExecutionOptimisticFinalizedResponse {
execution_optimistic_finalized_beacon_response(
ResponseIncludesVersion::Yes(fork_name),
execution_optimistic,
finalized,
data,
execution_optimistic: Some(execution_optimistic),
finalized: Some(finalized),
})
)
.map(|res| warp::reply::json(&res).into_response())
.map(|resp| add_consensus_version_header(resp, fork_name))
})
},
);
@@ -1186,8 +1193,8 @@ pub fn serve<T: BeaconChainTypes>(
|state_id: StateId,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>| {
task_spawner.blocking_json_task(Priority::P1, move || {
let (data, execution_optimistic, finalized) = state_id
task_spawner.blocking_response_task(Priority::P1, move || {
let (data, execution_optimistic, finalized, fork_name) = state_id
.map_state_and_execution_optimistic_and_finalized(
&chain,
|state, execution_optimistic, finalized| {
@@ -1197,15 +1204,23 @@ pub fn serve<T: BeaconChainTypes>(
));
};
Ok((withdrawals.clone(), execution_optimistic, finalized))
Ok((
withdrawals.clone(),
execution_optimistic,
finalized,
state.fork_name_unchecked(),
))
},
)?;
Ok(api_types::ExecutionOptimisticFinalizedResponse {
execution_optimistic_finalized_beacon_response(
ResponseIncludesVersion::Yes(fork_name),
execution_optimistic,
finalized,
data,
execution_optimistic: Some(execution_optimistic),
finalized: Some(finalized),
})
)
.map(|res| warp::reply::json(&res).into_response())
.map(|resp| add_consensus_version_header(resp, fork_name))
})
},
);
@@ -1405,21 +1420,30 @@ pub fn serve<T: BeaconChainTypes>(
.and(warp::path("beacon"))
.and(warp::path("blocks"))
.and(warp::path::end())
.and(warp_utils::json::json())
.and(warp::body::json())
.and(consensus_version_header_filter)
.and(task_spawner_filter.clone())
.and(chain_filter.clone())
.and(network_tx_filter.clone())
.and(network_globals.clone())
.then(
move |block_contents: PublishBlockRequest<T::EthSpec>,
move |value: serde_json::Value,
consensus_version: ForkName,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>,
network_globals: Arc<NetworkGlobals<T::EthSpec>>| {
task_spawner.spawn_async_with_rejection(Priority::P0, async move {
let request = PublishBlockRequest::<T::EthSpec>::context_deserialize(
&value,
consensus_version,
)
.map_err(|e| {
warp_utils::reject::custom_bad_request(format!("invalid JSON: {e:?}"))
})?;
publish_blocks::publish_block(
None,
ProvenancedBlock::local_from_publish_request(block_contents),
ProvenancedBlock::local_from_publish_request(request),
chain,
&network_tx,
BroadcastValidation::default(),
@@ -1475,22 +1499,32 @@ pub fn serve<T: BeaconChainTypes>(
.and(warp::path("blocks"))
.and(warp::query::<api_types::BroadcastValidationQuery>())
.and(warp::path::end())
.and(warp_utils::json::json())
.and(warp::body::json())
.and(consensus_version_header_filter)
.and(task_spawner_filter.clone())
.and(chain_filter.clone())
.and(network_tx_filter.clone())
.and(network_globals.clone())
.then(
move |validation_level: api_types::BroadcastValidationQuery,
block_contents: PublishBlockRequest<T::EthSpec>,
value: serde_json::Value,
consensus_version: ForkName,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>,
network_globals: Arc<NetworkGlobals<T::EthSpec>>| {
task_spawner.spawn_async_with_rejection(Priority::P0, async move {
let request = PublishBlockRequest::<T::EthSpec>::context_deserialize(
&value,
consensus_version,
)
.map_err(|e| {
warp_utils::reject::custom_bad_request(format!("invalid JSON: {e:?}"))
})?;
publish_blocks::publish_block(
None,
ProvenancedBlock::local_from_publish_request(block_contents),
ProvenancedBlock::local_from_publish_request(request),
chain,
&network_tx,
validation_level.broadcast_validation,
@@ -1723,6 +1757,12 @@ pub fn serve<T: BeaconChainTypes>(
.fork_name(&chain.spec)
.map_err(inconsistent_fork_rejection)?;
let require_version = match endpoint_version {
V1 => ResponseIncludesVersion::No,
V2 => ResponseIncludesVersion::Yes(fork_name),
_ => return Err(unsupported_version_rejection(endpoint_version)),
};
match accept_header {
Some(api_types::Accept::Ssz) => Response::builder()
.status(200)
@@ -1734,9 +1774,8 @@ pub fn serve<T: BeaconChainTypes>(
e
))
}),
_ => execution_optimistic_finalized_fork_versioned_response(
endpoint_version,
fork_name,
_ => execution_optimistic_finalized_beacon_response(
require_version,
execution_optimistic,
finalized,
block,
@@ -1796,9 +1835,15 @@ pub fn serve<T: BeaconChainTypes>(
.attestations()
.map(|att| att.clone_as_attestation())
.collect::<Vec<_>>();
let res = execution_optimistic_finalized_fork_versioned_response(
endpoint_version,
fork_name,
let require_version = match endpoint_version {
V1 => ResponseIncludesVersion::No,
V2 => ResponseIncludesVersion::Yes(fork_name),
_ => return Err(unsupported_version_rejection(endpoint_version)),
};
let res = execution_optimistic_finalized_beacon_response(
require_version,
execution_optimistic,
finalized,
&atts,
@@ -1845,9 +1890,8 @@ pub fn serve<T: BeaconChainTypes>(
}),
_ => {
// Post as a V2 endpoint so we return the fork version.
execution_optimistic_finalized_fork_versioned_response(
V2,
fork_name,
execution_optimistic_finalized_beacon_response(
ResponseIncludesVersion::Yes(fork_name),
execution_optimistic,
finalized,
block,
@@ -1901,9 +1945,8 @@ pub fn serve<T: BeaconChainTypes>(
}),
_ => {
// Post as a V2 endpoint so we return the fork version.
let res = execution_optimistic_finalized_fork_versioned_response(
V2,
fork_name,
let res = execution_optimistic_finalized_beacon_response(
ResponseIncludesVersion::Yes(fork_name),
execution_optimistic,
finalized,
&blob_sidecar_list_filtered,
@@ -2063,7 +2106,13 @@ pub fn serve<T: BeaconChainTypes>(
})
.collect::<Vec<_>>();
let res = fork_versioned_response(endpoint_version, fork_name, &attestations)?;
let require_version = match endpoint_version {
V1 => ResponseIncludesVersion::No,
V2 => ResponseIncludesVersion::Yes(fork_name),
_ => return Err(unsupported_version_rejection(endpoint_version)),
};
let res = beacon_response(require_version, &attestations);
Ok(add_consensus_version_header(
warp::reply::json(&res).into_response(),
fork_name,
@@ -2152,7 +2201,13 @@ pub fn serve<T: BeaconChainTypes>(
})
.collect::<Vec<_>>();
let res = fork_versioned_response(endpoint_version, fork_name, &slashings)?;
let require_version = match endpoint_version {
V1 => ResponseIncludesVersion::No,
V2 => ResponseIncludesVersion::Yes(fork_name),
_ => return Err(unsupported_version_rejection(endpoint_version)),
};
let res = beacon_response(require_version, &slashings);
Ok(add_consensus_version_header(
warp::reply::json(&res).into_response(),
fork_name,
@@ -2588,7 +2643,7 @@ pub fn serve<T: BeaconChainTypes>(
let fork_name = chain
.spec
.fork_name_at_slot::<T::EthSpec>(*update.signature_slot());
.fork_name_at_slot::<T::EthSpec>(update.get_slot());
match accept_header {
Some(api_types::Accept::Ssz) => Response::builder()
.status(200)
@@ -2600,11 +2655,10 @@ pub fn serve<T: BeaconChainTypes>(
e
))
}),
_ => Ok(warp::reply::json(&ForkVersionedResponse {
version: Some(fork_name),
metadata: EmptyMetadata {},
data: update,
})
_ => Ok(warp::reply::json(&beacon_response(
ResponseIncludesVersion::Yes(fork_name),
update,
))
.into_response()),
}
.map(|resp| add_consensus_version_header(resp, fork_name))
@@ -2649,11 +2703,10 @@ pub fn serve<T: BeaconChainTypes>(
e
))
}),
_ => Ok(warp::reply::json(&ForkVersionedResponse {
version: Some(fork_name),
metadata: EmptyMetadata {},
data: update,
})
_ => Ok(warp::reply::json(&beacon_response(
ResponseIncludesVersion::Yes(fork_name),
update,
))
.into_response()),
}
.map(|resp| add_consensus_version_header(resp, fork_name))
@@ -2845,7 +2898,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(task_spawner_filter.clone())
.and(chain_filter.clone())
.then(
|endpoint_version: EndpointVersion,
|_endpoint_version: EndpointVersion,
state_id: StateId,
accept_header: Option<api_types::Accept>,
task_spawner: TaskSpawner<T::EthSpec>,
@@ -2889,9 +2942,8 @@ pub fn serve<T: BeaconChainTypes>(
let fork_name = state
.fork_name(&chain.spec)
.map_err(inconsistent_fork_rejection)?;
let res = execution_optimistic_finalized_fork_versioned_response(
endpoint_version,
fork_name,
let res = execution_optimistic_finalized_beacon_response(
ResponseIncludesVersion::Yes(fork_name),
execution_optimistic,
finalized,
&state,
@@ -3372,7 +3424,7 @@ pub fn serve<T: BeaconChainTypes>(
if endpoint_version == V3 {
produce_block_v3(accept_header, chain, slot, query).await
} else {
produce_block_v2(endpoint_version, accept_header, chain, slot, query).await
produce_block_v2(accept_header, chain, slot, query).await
}
})
},
@@ -3402,8 +3454,7 @@ pub fn serve<T: BeaconChainTypes>(
chain: Arc<BeaconChain<T>>| {
task_spawner.spawn_async_with_rejection(Priority::P0, async move {
not_synced_filter?;
produce_blinded_block_v2(EndpointVersion(2), accept_header, chain, slot, query)
.await
produce_blinded_block_v2(accept_header, chain, slot, query).await
})
},
);