resolve merge conflict and migrate il service to new pardigmn

This commit is contained in:
Eitan Seri-Levi
2025-05-21 12:43:43 -07:00
358 changed files with 11541 additions and 6759 deletions

View File

@@ -4,7 +4,7 @@ use crate::version::{add_consensus_version_header, V1, V2};
use beacon_chain::{BeaconChain, BeaconChainTypes};
use eth2::types::{self, EndpointVersion, Hash256, Slot};
use std::sync::Arc;
use types::fork_versioned_response::EmptyMetadata;
use types::beacon_response::EmptyMetadata;
use types::{CommitteeIndex, ForkVersionedResponse};
use warp::{
hyper::{Body, Response},
@@ -52,7 +52,7 @@ pub fn get_aggregate_attestation<T: BeaconChainTypes>(
if endpoint_version == V2 {
let fork_versioned_response = ForkVersionedResponse {
version: Some(fork_name),
version: fork_name,
metadata: EmptyMetadata {},
data: aggregate_attestation,
};

View File

@@ -36,7 +36,7 @@ mod validators;
mod version;
use crate::light_client::{get_light_client_bootstrap, get_light_client_updates};
use crate::produce_block::{produce_blinded_block_v2, produce_block_v2, produce_block_v3};
use crate::version::fork_versioned_response;
use crate::version::beacon_response;
use beacon_chain::{
attestation_verification::VerifiedAttestation, observed_operations::ObservationOutcome,
validator_monitor::timestamp_now, AttestationError as AttnError, BeaconChain, BeaconChainError,
@@ -49,9 +49,9 @@ use bytes::Bytes;
use directory::DEFAULT_ROOT_DIR;
use either::Either;
use eth2::types::{
self as api_types, BroadcastValidation, EndpointVersion, ForkChoice, ForkChoiceNode,
LightClientUpdatesQuery, PublishBlockRequest, ValidatorBalancesRequestBody, ValidatorId,
ValidatorStatus, ValidatorsRequestBody,
self as api_types, BroadcastValidation, ContextDeserialize, EndpointVersion, ForkChoice,
ForkChoiceNode, LightClientUpdatesQuery, PublishBlockRequest, ValidatorBalancesRequestBody,
ValidatorId, ValidatorStatus, ValidatorsRequestBody,
};
use eth2::{CONSENSUS_VERSION_HEADER, CONTENT_TYPE_HEADER, SSZ_CONTENT_TYPE_HEADER};
use health_metrics::observe::Observe;
@@ -70,6 +70,7 @@ use serde_json::Value;
use slot_clock::SlotClock;
use ssz::Encode;
pub use state_id::StateId;
use std::collections::HashSet;
use std::future::Future;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::path::PathBuf;
@@ -88,10 +89,10 @@ use tokio_stream::{
StreamExt,
};
use tracing::{debug, error, info, warn};
use types::AttestationData;
use types::{
fork_versioned_response::EmptyMetadata, Attestation, AttestationData, AttestationShufflingId,
AttesterSlashing, BeaconStateError, ChainSpec, Checkpoint, CommitteeCache, ConfigAndPreset,
Epoch, EthSpec, ForkName, ForkVersionedResponse, Hash256, ProposerPreparationData,
Attestation, AttestationShufflingId, AttesterSlashing, BeaconStateError, ChainSpec, Checkpoint,
CommitteeCache, ConfigAndPreset, Epoch, EthSpec, ForkName, Hash256, ProposerPreparationData,
ProposerSlashing, RelativeEpoch, SignedAggregateAndProof, SignedBlindedBeaconBlock,
SignedBlsToExecutionChange, SignedContributionAndProof, SignedInclusionList,
SignedValidatorRegistrationData, SignedVoluntaryExit, Slot, SyncCommitteeMessage,
@@ -100,8 +101,8 @@ use types::{
use validator::pubkey_to_validator_index;
use version::{
add_consensus_version_header, add_ssz_content_type_header,
execution_optimistic_finalized_fork_versioned_response, inconsistent_fork_rejection,
unsupported_version_rejection, V1, V2, V3,
execution_optimistic_finalized_beacon_response, inconsistent_fork_rejection,
unsupported_version_rejection, ResponseIncludesVersion, V1, V2, V3,
};
use warp::http::StatusCode;
use warp::hyper::Body;
@@ -216,37 +217,67 @@ pub fn prometheus_metrics() -> warp::filters::log::Log<impl Fn(warp::filters::lo
// First line covers `POST /v1/beacon/blocks` only
equals("v1/beacon/blocks")
.or_else(|| starts_with("v1/validator/blocks"))
.or_else(|| starts_with("v2/validator/blocks"))
.or_else(|| starts_with("v1/validator/blinded_blocks"))
.or_else(|| starts_with("v1/validator/duties/attester"))
.or_else(|| starts_with("v1/validator/duties/proposer"))
.or_else(|| starts_with("v1/validator/duties/sync"))
.or_else(|| starts_with("v1/validator/duties/inclusion_list"))
.or_else(|| starts_with("v1/validator/attestation_data"))
.or_else(|| starts_with("v1/validator/aggregate_attestation"))
.or_else(|| starts_with("v2/validator/aggregate_attestation"))
.or_else(|| starts_with("v1/validator/aggregate_and_proofs"))
.or_else(|| starts_with("v2/validator/aggregate_and_proofs"))
.or_else(|| starts_with("v1/validator/sync_committee_contribution"))
.or_else(|| starts_with("v1/validator/inclusion_list"))
.or_else(|| starts_with("v1/validator/contribution_and_proofs"))
.or_else(|| starts_with("v1/validator/beacon_committee_subscriptions"))
.or_else(|| starts_with("v1/validator/sync_committee_subscriptions"))
.or_else(|| starts_with("v1/beacon/blob_sidecars"))
.or_else(|| starts_with("v1/beacon/blocks/head/root"))
.or_else(|| starts_with("v1/beacon/blinded_blocks"))
.or_else(|| starts_with("v1/beacon/deposit_snapshot"))
.or_else(|| starts_with("v1/beacon/headers"))
.or_else(|| starts_with("v1/beacon/light_client"))
.or_else(|| starts_with("v1/beacon/pool/attestations"))
.or_else(|| starts_with("v2/beacon/pool/attestations"))
.or_else(|| starts_with("v1/beacon/pool/attester_slashings"))
.or_else(|| starts_with("v1/beacon/pool/bls_to_execution_changes"))
.or_else(|| starts_with("v1/beacon/pool/proposer_slashings"))
.or_else(|| starts_with("v1/beacon/pool/sync_committees"))
.or_else(|| starts_with("v1/beacon/blocks/head/root"))
.or_else(|| starts_with("v1/validator/prepare_beacon_proposer"))
.or_else(|| starts_with("v1/validator/register_validator"))
.or_else(|| starts_with("v1/beacon/pool/voluntary_exits"))
.or_else(|| starts_with("v1/beacon/rewards/blocks"))
.or_else(|| starts_with("v1/beacon/rewards/attestations"))
.or_else(|| starts_with("v1/beacon/rewards/sync_committee"))
.or_else(|| starts_with("v1/beacon/rewards"))
.or_else(|| starts_with("v1/beacon/states"))
.or_else(|| starts_with("v1/beacon/"))
.or_else(|| starts_with("v2/beacon/"))
.or_else(|| starts_with("v1/builder/states"))
.or_else(|| starts_with("v1/config/deposit_contract"))
.or_else(|| starts_with("v1/config/fork_schedule"))
.or_else(|| starts_with("v1/config/spec"))
.or_else(|| starts_with("v1/config/"))
.or_else(|| starts_with("v1/debug/"))
.or_else(|| starts_with("v2/debug/"))
.or_else(|| starts_with("v1/events"))
.or_else(|| starts_with("v1/events/"))
.or_else(|| starts_with("v1/node/"))
.or_else(|| starts_with("v1/node/health"))
.or_else(|| starts_with("v1/node/identity"))
.or_else(|| starts_with("v1/node/peers"))
.or_else(|| starts_with("v1/node/peer_count"))
.or_else(|| starts_with("v1/node/syncing"))
.or_else(|| starts_with("v1/node/version"))
.or_else(|| starts_with("v1/node"))
.or_else(|| starts_with("v1/validator/aggregate_and_proofs"))
.or_else(|| starts_with("v2/validator/aggregate_and_proofs"))
.or_else(|| starts_with("v1/validator/aggregate_attestation"))
.or_else(|| starts_with("v2/validator/aggregate_attestation"))
.or_else(|| starts_with("v1/validator/attestation_data"))
.or_else(|| starts_with("v1/validator/beacon_committee_subscriptions"))
.or_else(|| starts_with("v1/validator/blinded_blocks"))
.or_else(|| starts_with("v2/validator/blinded_blocks"))
.or_else(|| starts_with("v1/validator/blocks"))
.or_else(|| starts_with("v2/validator/blocks"))
.or_else(|| starts_with("v3/validator/blocks"))
.or_else(|| starts_with("v1/validator/contribution_and_proofs"))
.or_else(|| starts_with("v1/validator/duties/attester"))
.or_else(|| starts_with("v1/validator/duties/proposer"))
.or_else(|| starts_with("v1/validator/duties/sync"))
.or_else(|| starts_with("v1/validator/inclusion_list"))
.or_else(|| starts_with("v1/validator/liveness"))
.or_else(|| starts_with("v1/validator/prepare_beacon_proposer"))
.or_else(|| starts_with("v1/validator/register_validator"))
.or_else(|| starts_with("v1/validator/sync_committee_contribution"))
.or_else(|| starts_with("v1/validator/sync_committee_subscriptions"))
.or_else(|| starts_with("v1/validator/"))
.or_else(|| starts_with("v2/validator/"))
.or_else(|| starts_with("v3/validator/"))
.or_else(|| starts_with("lighthouse"))
.unwrap_or("other")
};
@@ -259,6 +290,38 @@ pub fn prometheus_metrics() -> warp::filters::log::Log<impl Fn(warp::filters::lo
})
}
/// Creates a `warp` logging wrapper which we use to create `tracing` logs.
pub fn tracing_logging() -> warp::filters::log::Log<impl Fn(warp::filters::log::Info) + Clone> {
warp::log::custom(move |info| {
let status = info.status();
// Ensure elapsed time is in milliseconds.
let elapsed = info.elapsed().as_secs_f64() * 1000.0;
let path = info.path();
let method = info.method().to_string();
if status == StatusCode::OK
|| status == StatusCode::NOT_FOUND
|| status == StatusCode::PARTIAL_CONTENT
{
debug!(
elapsed_ms = %elapsed,
status = %status,
path = %path,
method = %method,
"Processed HTTP API request"
);
} else {
warn!(
elapsed_ms = %elapsed,
status = %status,
path = %path,
method = %method,
"Error processing HTTP API request"
);
}
})
}
/// Creates a server that will serve requests using information from `ctx`.
///
/// The server will shut down gracefully when the `shutdown` future resolves.
@@ -650,7 +713,7 @@ pub fn serve<T: BeaconChainTypes>(
.clone()
.and(warp::path("validator_balances"))
.and(warp::path::end())
.and(warp_utils::json::json())
.and(warp_utils::json::json_no_body())
.then(
|state_id: StateId,
task_spawner: TaskSpawner<T::EthSpec>,
@@ -1093,8 +1156,8 @@ pub fn serve<T: BeaconChainTypes>(
|state_id: StateId,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>| {
task_spawner.blocking_json_task(Priority::P1, move || {
let (data, execution_optimistic, finalized) = state_id
task_spawner.blocking_response_task(Priority::P1, move || {
let (data, execution_optimistic, finalized, fork_name) = state_id
.map_state_and_execution_optimistic_and_finalized(
&chain,
|state, execution_optimistic, finalized| {
@@ -1104,15 +1167,23 @@ pub fn serve<T: BeaconChainTypes>(
));
};
Ok((deposits.clone(), execution_optimistic, finalized))
Ok((
deposits.clone(),
execution_optimistic,
finalized,
state.fork_name_unchecked(),
))
},
)?;
Ok(api_types::ExecutionOptimisticFinalizedResponse {
execution_optimistic_finalized_beacon_response(
ResponseIncludesVersion::Yes(fork_name),
execution_optimistic,
finalized,
data,
execution_optimistic: Some(execution_optimistic),
finalized: Some(finalized),
})
)
.map(|res| warp::reply::json(&res).into_response())
.map(|resp| add_consensus_version_header(resp, fork_name))
})
},
);
@@ -1126,8 +1197,8 @@ pub fn serve<T: BeaconChainTypes>(
|state_id: StateId,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>| {
task_spawner.blocking_json_task(Priority::P1, move || {
let (data, execution_optimistic, finalized) = state_id
task_spawner.blocking_response_task(Priority::P1, move || {
let (data, execution_optimistic, finalized, fork_name) = state_id
.map_state_and_execution_optimistic_and_finalized(
&chain,
|state, execution_optimistic, finalized| {
@@ -1137,7 +1208,48 @@ pub fn serve<T: BeaconChainTypes>(
));
};
Ok((withdrawals.clone(), execution_optimistic, finalized))
Ok((
withdrawals.clone(),
execution_optimistic,
finalized,
state.fork_name_unchecked(),
))
},
)?;
execution_optimistic_finalized_beacon_response(
ResponseIncludesVersion::Yes(fork_name),
execution_optimistic,
finalized,
data,
)
.map(|res| warp::reply::json(&res).into_response())
.map(|resp| add_consensus_version_header(resp, fork_name))
})
},
);
// GET beacon/states/{state_id}/pending_consolidations
let get_beacon_state_pending_consolidations = beacon_states_path
.clone()
.and(warp::path("pending_consolidations"))
.and(warp::path::end())
.then(
|state_id: StateId,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>| {
task_spawner.blocking_json_task(Priority::P1, move || {
let (data, execution_optimistic, finalized) = state_id
.map_state_and_execution_optimistic_and_finalized(
&chain,
|state, execution_optimistic, finalized| {
let Ok(consolidations) = state.pending_consolidations() else {
return Err(warp_utils::reject::custom_bad_request(
"Pending consolidations not found".to_string(),
));
};
Ok((consolidations.clone(), execution_optimistic, finalized))
},
)?;
@@ -1312,21 +1424,30 @@ pub fn serve<T: BeaconChainTypes>(
.and(warp::path("beacon"))
.and(warp::path("blocks"))
.and(warp::path::end())
.and(warp_utils::json::json())
.and(warp::body::json())
.and(consensus_version_header_filter)
.and(task_spawner_filter.clone())
.and(chain_filter.clone())
.and(network_tx_filter.clone())
.and(network_globals.clone())
.then(
move |block_contents: PublishBlockRequest<T::EthSpec>,
move |value: serde_json::Value,
consensus_version: ForkName,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>,
network_globals: Arc<NetworkGlobals<T::EthSpec>>| {
task_spawner.spawn_async_with_rejection(Priority::P0, async move {
let request = PublishBlockRequest::<T::EthSpec>::context_deserialize(
&value,
consensus_version,
)
.map_err(|e| {
warp_utils::reject::custom_bad_request(format!("invalid JSON: {e:?}"))
})?;
publish_blocks::publish_block(
None,
ProvenancedBlock::local_from_publish_request(block_contents),
ProvenancedBlock::local_from_publish_request(request),
chain,
&network_tx,
BroadcastValidation::default(),
@@ -1382,22 +1503,32 @@ pub fn serve<T: BeaconChainTypes>(
.and(warp::path("blocks"))
.and(warp::query::<api_types::BroadcastValidationQuery>())
.and(warp::path::end())
.and(warp_utils::json::json())
.and(warp::body::json())
.and(consensus_version_header_filter)
.and(task_spawner_filter.clone())
.and(chain_filter.clone())
.and(network_tx_filter.clone())
.and(network_globals.clone())
.then(
move |validation_level: api_types::BroadcastValidationQuery,
block_contents: PublishBlockRequest<T::EthSpec>,
value: serde_json::Value,
consensus_version: ForkName,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>,
network_globals: Arc<NetworkGlobals<T::EthSpec>>| {
task_spawner.spawn_async_with_rejection(Priority::P0, async move {
let request = PublishBlockRequest::<T::EthSpec>::context_deserialize(
&value,
consensus_version,
)
.map_err(|e| {
warp_utils::reject::custom_bad_request(format!("invalid JSON: {e:?}"))
})?;
publish_blocks::publish_block(
None,
ProvenancedBlock::local_from_publish_request(block_contents),
ProvenancedBlock::local_from_publish_request(request),
chain,
&network_tx,
validation_level.broadcast_validation,
@@ -1630,6 +1761,12 @@ pub fn serve<T: BeaconChainTypes>(
.fork_name(&chain.spec)
.map_err(inconsistent_fork_rejection)?;
let require_version = match endpoint_version {
V1 => ResponseIncludesVersion::No,
V2 => ResponseIncludesVersion::Yes(fork_name),
_ => return Err(unsupported_version_rejection(endpoint_version)),
};
match accept_header {
Some(api_types::Accept::Ssz) => Response::builder()
.status(200)
@@ -1641,9 +1778,8 @@ pub fn serve<T: BeaconChainTypes>(
e
))
}),
_ => execution_optimistic_finalized_fork_versioned_response(
endpoint_version,
fork_name,
_ => execution_optimistic_finalized_beacon_response(
require_version,
execution_optimistic,
finalized,
block,
@@ -1703,9 +1839,15 @@ pub fn serve<T: BeaconChainTypes>(
.attestations()
.map(|att| att.clone_as_attestation())
.collect::<Vec<_>>();
let res = execution_optimistic_finalized_fork_versioned_response(
endpoint_version,
fork_name,
let require_version = match endpoint_version {
V1 => ResponseIncludesVersion::No,
V2 => ResponseIncludesVersion::Yes(fork_name),
_ => return Err(unsupported_version_rejection(endpoint_version)),
};
let res = execution_optimistic_finalized_beacon_response(
require_version,
execution_optimistic,
finalized,
&atts,
@@ -1752,9 +1894,8 @@ pub fn serve<T: BeaconChainTypes>(
}),
_ => {
// Post as a V2 endpoint so we return the fork version.
execution_optimistic_finalized_fork_versioned_response(
V2,
fork_name,
execution_optimistic_finalized_beacon_response(
ResponseIncludesVersion::Yes(fork_name),
execution_optimistic,
finalized,
block,
@@ -1808,9 +1949,8 @@ pub fn serve<T: BeaconChainTypes>(
}),
_ => {
// Post as a V2 endpoint so we return the fork version.
let res = execution_optimistic_finalized_fork_versioned_response(
V2,
fork_name,
let res = execution_optimistic_finalized_beacon_response(
ResponseIncludesVersion::Yes(fork_name),
execution_optimistic,
finalized,
&blob_sidecar_list_filtered,
@@ -1932,11 +2072,11 @@ pub fn serve<T: BeaconChainTypes>(
chain: Arc<BeaconChain<T>>,
query: api_types::AttestationPoolQuery| {
task_spawner.blocking_response_task(Priority::P1, move || {
let query_filter = |data: &AttestationData| {
let query_filter = |data: &AttestationData, committee_indices: HashSet<u64>| {
query.slot.is_none_or(|slot| slot == data.slot)
&& query
.committee_index
.is_none_or(|index| index == data.index)
.is_none_or(|index| committee_indices.contains(&index))
};
let mut attestations = chain.op_pool.get_filtered_attestations(query_filter);
@@ -1945,7 +2085,9 @@ pub fn serve<T: BeaconChainTypes>(
.naive_aggregation_pool
.read()
.iter()
.filter(|&att| query_filter(att.data()))
.filter(|&att| {
query_filter(att.data(), att.get_committee_indices_map())
})
.cloned(),
);
// Use the current slot to find the fork version, and convert all messages to the
@@ -1968,7 +2110,13 @@ pub fn serve<T: BeaconChainTypes>(
})
.collect::<Vec<_>>();
let res = fork_versioned_response(endpoint_version, fork_name, &attestations)?;
let require_version = match endpoint_version {
V1 => ResponseIncludesVersion::No,
V2 => ResponseIncludesVersion::Yes(fork_name),
_ => return Err(unsupported_version_rejection(endpoint_version)),
};
let res = beacon_response(require_version, &attestations);
Ok(add_consensus_version_header(
warp::reply::json(&res).into_response(),
fork_name,
@@ -2057,7 +2205,13 @@ pub fn serve<T: BeaconChainTypes>(
})
.collect::<Vec<_>>();
let res = fork_versioned_response(endpoint_version, fork_name, &slashings)?;
let require_version = match endpoint_version {
V1 => ResponseIncludesVersion::No,
V2 => ResponseIncludesVersion::Yes(fork_name),
_ => return Err(unsupported_version_rejection(endpoint_version)),
};
let res = beacon_response(require_version, &slashings);
Ok(add_consensus_version_header(
warp::reply::json(&res).into_response(),
fork_name,
@@ -2521,7 +2675,7 @@ pub fn serve<T: BeaconChainTypes>(
let fork_name = chain
.spec
.fork_name_at_slot::<T::EthSpec>(*update.signature_slot());
.fork_name_at_slot::<T::EthSpec>(update.get_slot());
match accept_header {
Some(api_types::Accept::Ssz) => Response::builder()
.status(200)
@@ -2533,11 +2687,10 @@ pub fn serve<T: BeaconChainTypes>(
e
))
}),
_ => Ok(warp::reply::json(&ForkVersionedResponse {
version: Some(fork_name),
metadata: EmptyMetadata {},
data: update,
})
_ => Ok(warp::reply::json(&beacon_response(
ResponseIncludesVersion::Yes(fork_name),
update,
))
.into_response()),
}
.map(|resp| add_consensus_version_header(resp, fork_name))
@@ -2582,11 +2735,10 @@ pub fn serve<T: BeaconChainTypes>(
e
))
}),
_ => Ok(warp::reply::json(&ForkVersionedResponse {
version: Some(fork_name),
metadata: EmptyMetadata {},
data: update,
})
_ => Ok(warp::reply::json(&beacon_response(
ResponseIncludesVersion::Yes(fork_name),
update,
))
.into_response()),
}
.map(|resp| add_consensus_version_header(resp, fork_name))
@@ -2778,7 +2930,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(task_spawner_filter.clone())
.and(chain_filter.clone())
.then(
|endpoint_version: EndpointVersion,
|_endpoint_version: EndpointVersion,
state_id: StateId,
accept_header: Option<api_types::Accept>,
task_spawner: TaskSpawner<T::EthSpec>,
@@ -2822,9 +2974,8 @@ pub fn serve<T: BeaconChainTypes>(
let fork_name = state
.fork_name(&chain.spec)
.map_err(inconsistent_fork_rejection)?;
let res = execution_optimistic_finalized_fork_versioned_response(
endpoint_version,
fork_name,
let res = execution_optimistic_finalized_beacon_response(
ResponseIncludesVersion::Yes(fork_name),
execution_optimistic,
finalized,
&state,
@@ -3174,13 +3325,14 @@ pub fn serve<T: BeaconChainTypes>(
let direction = dir.into();
let state = peer_info.connection_status().clone().into();
let state_matches = query.state.as_ref().is_none_or(|states| {
states.iter().any(|state_param| *state_param == state)
});
let direction_matches =
query.direction.as_ref().is_none_or(|directions| {
directions.iter().any(|dir_param| *dir_param == direction)
});
let state_matches = query
.state
.as_ref()
.is_none_or(|states| states.contains(&state));
let direction_matches = query
.direction
.as_ref()
.is_none_or(|directions| directions.contains(&direction));
if state_matches && direction_matches {
peers.push(api_types::PeerData {
@@ -3304,7 +3456,7 @@ pub fn serve<T: BeaconChainTypes>(
if endpoint_version == V3 {
produce_block_v3(accept_header, chain, slot, query).await
} else {
produce_block_v2(endpoint_version, accept_header, chain, slot, query).await
produce_block_v2(accept_header, chain, slot, query).await
}
})
},
@@ -3334,8 +3486,7 @@ pub fn serve<T: BeaconChainTypes>(
chain: Arc<BeaconChain<T>>| {
task_spawner.spawn_async_with_rejection(Priority::P0, async move {
not_synced_filter?;
produce_blinded_block_v2(EndpointVersion(2), accept_header, chain, slot, query)
.await
produce_blinded_block_v2(accept_header, chain, slot, query).await
})
},
);
@@ -4848,6 +4999,7 @@ pub fn serve<T: BeaconChainTypes>(
.uor(get_beacon_state_randao)
.uor(get_beacon_state_pending_deposits)
.uor(get_beacon_state_pending_partial_withdrawals)
.uor(get_beacon_state_pending_consolidations)
.uor(get_beacon_headers)
.uor(get_beacon_headers_block_id)
.uor(get_beacon_block)
@@ -4960,6 +5112,7 @@ pub fn serve<T: BeaconChainTypes>(
),
)
.recover(warp_utils::reject::handle_rejection)
.with(tracing_logging())
.with(prometheus_metrics())
// Add a `Server` header.
.map(|reply| warp::reply::with_header(reply, "Server", &version_with_platform()))

View File

@@ -1,14 +1,15 @@
use crate::version::{
add_consensus_version_header, add_ssz_content_type_header, fork_versioned_response, V1,
add_consensus_version_header, add_ssz_content_type_header, beacon_response,
ResponseIncludesVersion,
};
use beacon_chain::{BeaconChain, BeaconChainError, BeaconChainTypes};
use eth2::types::{
self as api_types, ChainSpec, ForkVersionedResponse, LightClientUpdate,
LightClientUpdateResponseChunk, LightClientUpdateSszResponse, LightClientUpdatesQuery,
self as api_types, ChainSpec, LightClientUpdate, LightClientUpdateResponseChunk,
LightClientUpdateResponseChunkInner, LightClientUpdatesQuery,
};
use ssz::Encode;
use std::sync::Arc;
use types::{ForkName, Hash256, LightClientBootstrap};
use types::{BeaconResponse, ForkName, Hash256, LightClientBootstrap};
use warp::{
hyper::{Body, Response},
reply::Reply,
@@ -37,15 +38,9 @@ pub fn get_light_client_updates<T: BeaconChainTypes>(
.map(|update| map_light_client_update_to_ssz_chunk::<T>(&chain, update))
.collect::<Vec<LightClientUpdateResponseChunk>>();
let ssz_response = LightClientUpdateSszResponse {
response_chunk_len: (light_client_updates.len() as u64).to_le_bytes().to_vec(),
response_chunk: response_chunks.as_ssz_bytes(),
}
.as_ssz_bytes();
Response::builder()
.status(200)
.body(ssz_response)
.body(response_chunks.as_ssz_bytes())
.map(|res: Response<Vec<u8>>| add_ssz_content_type_header(res))
.map_err(|e| {
warp_utils::reject::custom_server_error(format!(
@@ -58,7 +53,7 @@ pub fn get_light_client_updates<T: BeaconChainTypes>(
let fork_versioned_response = light_client_updates
.iter()
.map(|update| map_light_client_update_to_json_response::<T>(&chain, update.clone()))
.collect::<Result<Vec<ForkVersionedResponse<LightClientUpdate<T::EthSpec>>>, Rejection>>()?;
.collect::<Vec<BeaconResponse<LightClientUpdate<T::EthSpec>>>>();
Ok(warp::reply::json(&fork_versioned_response).into_response())
}
}
@@ -94,10 +89,8 @@ pub fn get_light_client_bootstrap<T: BeaconChainTypes>(
warp_utils::reject::custom_server_error(format!("failed to create response: {}", e))
}),
_ => {
let fork_versioned_response = map_light_client_bootstrap_to_json_response::<T>(
fork_name,
light_client_bootstrap,
)?;
let fork_versioned_response =
map_light_client_bootstrap_to_json_response::<T>(fork_name, light_client_bootstrap);
Ok(warp::reply::json(&fork_versioned_response).into_response())
}
}
@@ -159,33 +152,44 @@ fn map_light_client_update_to_ssz_chunk<T: BeaconChainTypes>(
) -> LightClientUpdateResponseChunk {
let fork_name = chain
.spec
.fork_name_at_slot::<T::EthSpec>(*light_client_update.signature_slot());
.fork_name_at_slot::<T::EthSpec>(light_client_update.attested_header_slot());
let fork_digest = ChainSpec::compute_fork_digest(
chain.spec.fork_version_for_name(fork_name),
chain.genesis_validators_root,
);
LightClientUpdateResponseChunk {
let payload = light_client_update.as_ssz_bytes();
let response_chunk_len = fork_digest.len() + payload.len();
let response_chunk = LightClientUpdateResponseChunkInner {
context: fork_digest,
payload: light_client_update.as_ssz_bytes(),
payload,
};
LightClientUpdateResponseChunk {
response_chunk_len: response_chunk_len as u64,
response_chunk,
}
}
fn map_light_client_bootstrap_to_json_response<T: BeaconChainTypes>(
fork_name: ForkName,
light_client_bootstrap: LightClientBootstrap<T::EthSpec>,
) -> Result<ForkVersionedResponse<LightClientBootstrap<T::EthSpec>>, Rejection> {
fork_versioned_response(V1, fork_name, light_client_bootstrap)
) -> BeaconResponse<LightClientBootstrap<T::EthSpec>> {
beacon_response(
ResponseIncludesVersion::Yes(fork_name),
light_client_bootstrap,
)
}
fn map_light_client_update_to_json_response<T: BeaconChainTypes>(
chain: &BeaconChain<T>,
light_client_update: LightClientUpdate<T::EthSpec>,
) -> Result<ForkVersionedResponse<LightClientUpdate<T::EthSpec>>, Rejection> {
) -> BeaconResponse<LightClientUpdate<T::EthSpec>> {
let fork_name = chain
.spec
.fork_name_at_slot::<T::EthSpec>(*light_client_update.signature_slot());
fork_versioned_response(V1, fork_name, light_client_update)
beacon_response(ResponseIncludesVersion::Yes(fork_name), light_client_update)
}

View File

@@ -3,15 +3,14 @@ use crate::{
version::{
add_consensus_block_value_header, add_consensus_version_header,
add_execution_payload_blinded_header, add_execution_payload_value_header,
add_ssz_content_type_header, fork_versioned_response, inconsistent_fork_rejection,
add_ssz_content_type_header, beacon_response, inconsistent_fork_rejection,
ResponseIncludesVersion,
},
};
use beacon_chain::{
BeaconBlockResponseWrapper, BeaconChain, BeaconChainTypes, ProduceBlockVerification,
};
use eth2::types::{
self as api_types, EndpointVersion, ProduceBlockV3Metadata, SkipRandaoVerification,
};
use eth2::types::{self as api_types, ProduceBlockV3Metadata, SkipRandaoVerification};
use ssz::Encode;
use std::sync::Arc;
use types::{payload::BlockProductionVersion, *};
@@ -115,7 +114,7 @@ pub fn build_response_v3<T: BeaconChainTypes>(
warp_utils::reject::custom_server_error(format!("failed to create response: {}", e))
}),
_ => Ok(warp::reply::json(&ForkVersionedResponse {
version: Some(fork_name),
version: fork_name,
metadata,
data: block_contents,
})
@@ -129,7 +128,6 @@ pub fn build_response_v3<T: BeaconChainTypes>(
}
pub async fn produce_blinded_block_v2<T: BeaconChainTypes>(
endpoint_version: EndpointVersion,
accept_header: Option<api_types::Accept>,
chain: Arc<BeaconChain<T>>,
slot: Slot,
@@ -155,11 +153,10 @@ pub async fn produce_blinded_block_v2<T: BeaconChainTypes>(
.await
.map_err(warp_utils::reject::unhandled_error)?;
build_response_v2(chain, block_response_type, endpoint_version, accept_header)
build_response_v2(chain, block_response_type, accept_header)
}
pub async fn produce_block_v2<T: BeaconChainTypes>(
endpoint_version: EndpointVersion,
accept_header: Option<api_types::Accept>,
chain: Arc<BeaconChain<T>>,
slot: Slot,
@@ -186,13 +183,12 @@ pub async fn produce_block_v2<T: BeaconChainTypes>(
.await
.map_err(warp_utils::reject::unhandled_error)?;
build_response_v2(chain, block_response_type, endpoint_version, accept_header)
build_response_v2(chain, block_response_type, accept_header)
}
pub fn build_response_v2<T: BeaconChainTypes>(
chain: Arc<BeaconChain<T>>,
block_response: BeaconBlockResponseWrapper<T::EthSpec>,
endpoint_version: EndpointVersion,
accept_header: Option<api_types::Accept>,
) -> Result<Response<Body>, warp::Rejection> {
let fork_name = block_response
@@ -210,8 +206,10 @@ pub fn build_response_v2<T: BeaconChainTypes>(
.map_err(|e| {
warp_utils::reject::custom_server_error(format!("failed to create response: {}", e))
}),
_ => fork_versioned_response(endpoint_version, fork_name, block_contents)
.map(|response| warp::reply::json(&response).into_response())
.map(|res| add_consensus_version_header(res, fork_name)),
_ => Ok(warp::reply::json(&beacon_response(
ResponseIncludesVersion::Yes(fork_name),
block_contents,
))
.into_response()),
}
}

View File

@@ -60,13 +60,13 @@ use types::{Attestation, EthSpec, ForkName, SingleAttestation};
pub enum Error {
Validation(AttestationError),
Publication,
ForkChoice(#[allow(dead_code)] BeaconChainError),
ForkChoice(#[allow(dead_code)] Box<BeaconChainError>),
AggregationPool(#[allow(dead_code)] AttestationError),
ReprocessDisabled,
ReprocessFull,
ReprocessTimeout,
InvalidJson(#[allow(dead_code)] serde_json::Error),
FailedConversion(#[allow(dead_code)] BeaconChainError),
FailedConversion(#[allow(dead_code)] Box<BeaconChainError>),
}
enum PublishAttestationResult {
@@ -164,7 +164,7 @@ fn verify_and_publish_attestation<T: BeaconChainTypes>(
}
if let Err(e) = fc_result {
Err(Error::ForkChoice(e))
Err(Error::ForkChoice(Box::new(e)))
} else if let Err(e) = naive_aggregation_result {
Err(Error::AggregationPool(e))
} else {
@@ -213,7 +213,7 @@ fn convert_to_attestation<'a, T: BeaconChainTypes>(
beacon_block_root,
}))
}
Err(e) => Err(Error::FailedConversion(e)),
Err(e) => Err(Error::FailedConversion(Box::new(e))),
}
}
}

View File

@@ -2,7 +2,7 @@ use crate::metrics;
use std::future::Future;
use beacon_chain::blob_verification::{GossipBlobError, GossipVerifiedBlob};
use beacon_chain::block_verification_types::AsBlock;
use beacon_chain::block_verification_types::{AsBlock, RpcBlock};
use beacon_chain::data_column_verification::{GossipDataColumnError, GossipVerifiedDataColumn};
use beacon_chain::validator_monitor::{get_block_delay_ms, timestamp_now};
use beacon_chain::{
@@ -123,8 +123,9 @@ pub async fn publish_block<T: BeaconChainTypes, B: IntoGossipVerifiedBlock<T>>(
"Signed block published to network via HTTP API"
);
crate::publish_pubsub_message(&sender, PubsubMessage::BeaconBlock(block.clone()))
.map_err(|_| BlockError::BeaconChainError(BeaconChainError::UnableToPublish))?;
crate::publish_pubsub_message(&sender, PubsubMessage::BeaconBlock(block.clone())).map_err(
|_| BlockError::BeaconChainError(Box::new(BeaconChainError::UnableToPublish)),
)?;
Ok(())
};
@@ -302,7 +303,11 @@ pub async fn publish_block<T: BeaconChainTypes, B: IntoGossipVerifiedBlock<T>>(
);
let import_result = Box::pin(chain.process_block(
block_root,
block.clone(),
RpcBlock::new_without_blobs(
Some(block_root),
block.clone(),
network_globals.custody_columns_count() as usize,
),
NotifyExecutionLayer::Yes,
BlockImportSource::HttpApi,
publish_fn,
@@ -364,7 +369,7 @@ fn spawn_build_data_sidecar_task<T: BeaconChainTypes>(
} else {
// Post PeerDAS: construct data columns.
let gossip_verified_data_columns =
build_gossip_verified_data_columns(&chain, &block, blobs)?;
build_gossip_verified_data_columns(&chain, &block, blobs, kzg_proofs)?;
Ok((vec![], gossip_verified_data_columns))
}
},
@@ -383,10 +388,11 @@ fn build_gossip_verified_data_columns<T: BeaconChainTypes>(
chain: &BeaconChain<T>,
block: &SignedBeaconBlock<T::EthSpec, FullPayload<T::EthSpec>>,
blobs: BlobsList<T::EthSpec>,
kzg_cell_proofs: KzgProofs<T::EthSpec>,
) -> Result<Vec<Option<GossipVerifiedDataColumn<T>>>, Rejection> {
let slot = block.slot();
let data_column_sidecars =
build_blob_data_column_sidecars(chain, block, blobs).map_err(|e| {
build_blob_data_column_sidecars(chain, block, blobs, kzg_cell_proofs).map_err(|e| {
error!(
error = ?e,
%slot,
@@ -501,7 +507,7 @@ fn publish_blob_sidecars<T: BeaconChainTypes>(
) -> Result<(), BlockError> {
let pubsub_message = PubsubMessage::BlobSidecar(Box::new((blob.index(), blob.clone_blob())));
crate::publish_pubsub_message(sender_clone, pubsub_message)
.map_err(|_| BlockError::BeaconChainError(BeaconChainError::UnableToPublish))
.map_err(|_| BlockError::BeaconChainError(Box::new(BeaconChainError::UnableToPublish)))
}
fn publish_column_sidecars<T: BeaconChainTypes>(
@@ -520,7 +526,7 @@ fn publish_column_sidecars<T: BeaconChainTypes>(
.len()
.saturating_sub(malicious_withhold_count);
// Randomize columns before dropping the last malicious_withhold_count items
data_column_sidecars.shuffle(&mut rand::thread_rng());
data_column_sidecars.shuffle(&mut **chain.rng.lock());
data_column_sidecars.truncate(columns_to_keep);
}
let pubsub_messages = data_column_sidecars
@@ -531,7 +537,7 @@ fn publish_column_sidecars<T: BeaconChainTypes>(
})
.collect::<Vec<_>>();
crate::publish_pubsub_messages(sender_clone, pubsub_messages)
.map_err(|_| BlockError::BeaconChainError(BeaconChainError::UnableToPublish))
.map_err(|_| BlockError::BeaconChainError(Box::new(BeaconChainError::UnableToPublish)))
}
async fn post_block_import_logging_and_response<T: BeaconChainTypes>(
@@ -588,7 +594,9 @@ async fn post_block_import_logging_and_response<T: BeaconChainTypes>(
Err(warp_utils::reject::custom_bad_request(msg))
}
}
Err(BlockError::BeaconChainError(BeaconChainError::UnableToPublish)) => {
Err(BlockError::BeaconChainError(e))
if matches!(e.as_ref(), BeaconChainError::UnableToPublish) =>
{
Err(warp_utils::reject::custom_server_error(
"unable to publish to network channel".to_string(),
))
@@ -784,7 +792,7 @@ fn check_slashable<T: BeaconChainTypes>(
block_clone.message().proposer_index(),
block_root,
)
.map_err(|e| BlockError::BeaconChainError(e.into()))?
.map_err(|e| BlockError::BeaconChainError(Box::new(e.into())))?
{
warn!(
slot = %block_clone.slot(),

View File

@@ -59,7 +59,7 @@ pub fn sync_committee_duties<T: BeaconChainTypes>(
}
let duties = duties_from_state_load(request_epoch, request_indices, altair_fork_epoch, chain)
.map_err(|e| match e {
.map_err(|e| match *e {
BeaconChainError::SyncDutiesError(BeaconStateError::SyncCommitteeNotKnown {
current_epoch,
..
@@ -81,7 +81,7 @@ fn duties_from_state_load<T: BeaconChainTypes>(
request_indices: &[u64],
altair_fork_epoch: Epoch,
chain: &BeaconChain<T>,
) -> Result<Vec<Result<Option<SyncDuty>, BeaconStateError>>, BeaconChainError> {
) -> Result<Vec<Result<Option<SyncDuty>, BeaconStateError>>, Box<BeaconChainError>> {
// Determine what the current epoch would be if we fast-forward our system clock by
// `MAXIMUM_GOSSIP_CLOCK_DISPARITY`.
//
@@ -92,11 +92,17 @@ fn duties_from_state_load<T: BeaconChainTypes>(
let tolerant_current_epoch = chain
.slot_clock
.now_with_future_tolerance(chain.spec.maximum_gossip_clock_disparity())
.ok_or(BeaconChainError::UnableToReadSlot)?
.ok_or(BeaconChainError::UnableToReadSlot)
.map_err(Box::new)?
.epoch(T::EthSpec::slots_per_epoch());
let max_sync_committee_period = tolerant_current_epoch.sync_committee_period(&chain.spec)? + 1;
let sync_committee_period = request_epoch.sync_committee_period(&chain.spec)?;
let max_sync_committee_period = tolerant_current_epoch
.sync_committee_period(&chain.spec)
.map_err(|e| Box::new(e.into()))?
+ 1;
let sync_committee_period = request_epoch
.sync_committee_period(&chain.spec)
.map_err(|e| Box::new(e.into()))?;
if tolerant_current_epoch < altair_fork_epoch {
// Empty response if the epoch is pre-Altair.
@@ -119,13 +125,14 @@ fn duties_from_state_load<T: BeaconChainTypes>(
state
.get_sync_committee_duties(request_epoch, request_indices, &chain.spec)
.map_err(BeaconChainError::SyncDutiesError)
.map_err(Box::new)
} else {
Err(BeaconChainError::SyncDutiesError(
Err(Box::new(BeaconChainError::SyncDutiesError(
BeaconStateError::SyncCommitteeNotKnown {
current_epoch,
epoch: request_epoch,
},
))
)))
}
}

View File

@@ -7,9 +7,10 @@ pub fn pubkey_to_validator_index<T: BeaconChainTypes>(
chain: &BeaconChain<T>,
state: &BeaconState<T::EthSpec>,
pubkey: &PublicKeyBytes,
) -> Result<Option<usize>, BeaconChainError> {
) -> Result<Option<usize>, Box<BeaconChainError>> {
chain
.validator_index(pubkey)?
.validator_index(pubkey)
.map_err(Box::new)?
.filter(|&index| {
state
.validators()

View File

@@ -81,8 +81,13 @@ pub fn get_beacon_state_validator_balances<T: BeaconChainTypes>(
.map_state_and_execution_optimistic_and_finalized(
&chain,
|state, execution_optimistic, finalized| {
let ids_filter_set: Option<HashSet<&ValidatorId>> =
optional_ids.map(|f| HashSet::from_iter(f.iter()));
let ids_filter_set: Option<HashSet<&ValidatorId>> = match optional_ids {
// if optional_ids (the request data body) is [], returns a `None`, so that later when calling .is_none_or() will return True
// Hence, all validators will pass through .filter(), and balances of all validators are returned, in accordance to the spec
Some([]) => None,
Some(ids) => Some(HashSet::from_iter(ids.iter())),
None => None,
};
Ok((
state

View File

@@ -5,10 +5,11 @@ use eth2::{
};
use serde::Serialize;
use types::{
fork_versioned_response::{
ExecutionOptimisticFinalizedForkVersionedResponse, ExecutionOptimisticFinalizedMetadata,
beacon_response::{
ExecutionOptimisticFinalizedBeaconResponse, ExecutionOptimisticFinalizedMetadata,
},
ForkName, ForkVersionedResponse, InconsistentFork, Uint256,
BeaconResponse, ForkName, ForkVersionedResponse, InconsistentFork, Uint256,
UnversionedResponse,
};
use warp::reply::{self, Reply, Response};
@@ -16,47 +17,54 @@ pub const V1: EndpointVersion = EndpointVersion(1);
pub const V2: EndpointVersion = EndpointVersion(2);
pub const V3: EndpointVersion = EndpointVersion(3);
pub fn fork_versioned_response<T: Serialize>(
endpoint_version: EndpointVersion,
fork_name: ForkName,
data: T,
) -> Result<ForkVersionedResponse<T>, warp::reject::Rejection> {
let fork_name = if endpoint_version == V1 {
None
} else if endpoint_version == V2 || endpoint_version == V3 {
Some(fork_name)
} else {
return Err(unsupported_version_rejection(endpoint_version));
};
Ok(ForkVersionedResponse {
version: fork_name,
metadata: Default::default(),
data,
})
#[derive(Debug, PartialEq, Clone, Serialize)]
pub enum ResponseIncludesVersion {
Yes(ForkName),
No,
}
pub fn execution_optimistic_finalized_fork_versioned_response<T: Serialize>(
endpoint_version: EndpointVersion,
fork_name: ForkName,
pub fn beacon_response<T: Serialize>(
require_version: ResponseIncludesVersion,
data: T,
) -> BeaconResponse<T> {
match require_version {
ResponseIncludesVersion::Yes(fork_name) => {
BeaconResponse::ForkVersioned(ForkVersionedResponse {
version: fork_name,
metadata: Default::default(),
data,
})
}
ResponseIncludesVersion::No => BeaconResponse::Unversioned(UnversionedResponse {
metadata: Default::default(),
data,
}),
}
}
pub fn execution_optimistic_finalized_beacon_response<T: Serialize>(
require_version: ResponseIncludesVersion,
execution_optimistic: bool,
finalized: bool,
data: T,
) -> Result<ExecutionOptimisticFinalizedForkVersionedResponse<T>, warp::reject::Rejection> {
let fork_name = if endpoint_version == V1 {
None
} else if endpoint_version == V2 {
Some(fork_name)
} else {
return Err(unsupported_version_rejection(endpoint_version));
) -> Result<ExecutionOptimisticFinalizedBeaconResponse<T>, warp::reject::Rejection> {
let metadata = ExecutionOptimisticFinalizedMetadata {
execution_optimistic: Some(execution_optimistic),
finalized: Some(finalized),
};
Ok(ExecutionOptimisticFinalizedForkVersionedResponse {
version: fork_name,
metadata: ExecutionOptimisticFinalizedMetadata {
execution_optimistic: Some(execution_optimistic),
finalized: Some(finalized),
},
data,
})
match require_version {
ResponseIncludesVersion::Yes(fork_name) => {
Ok(BeaconResponse::ForkVersioned(ForkVersionedResponse {
version: fork_name,
metadata,
data,
}))
}
ResponseIncludesVersion::No => Ok(BeaconResponse::Unversioned(UnversionedResponse {
metadata,
data,
})),
}
}
/// Add the 'Content-Type application/octet-stream` header to a response.