Merge branch 'unstable' into merge-unstable-to-deneb-20230816

# Conflicts:
#	beacon_node/http_api/src/lib.rs
This commit is contained in:
Jimmy Chen
2023-08-16 14:31:59 +10:00
26 changed files with 325 additions and 284 deletions

View File

@@ -516,7 +516,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(warp::path::end())
.and(task_spawner_filter.clone())
.and(chain_filter.clone())
.and_then(
.then(
|task_spawner: TaskSpawner<T::EthSpec>, chain: Arc<BeaconChain<T>>| {
task_spawner.blocking_json_task(Priority::P1, move || {
let genesis_data = api_types::GenesisData {
@@ -549,7 +549,7 @@ pub fn serve<T: BeaconChainTypes>(
.clone()
.and(warp::path("root"))
.and(warp::path::end())
.and_then(
.then(
|state_id: StateId,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>| {
@@ -570,7 +570,7 @@ pub fn serve<T: BeaconChainTypes>(
.clone()
.and(warp::path("fork"))
.and(warp::path::end())
.and_then(
.then(
|state_id: StateId,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>| {
@@ -591,7 +591,7 @@ pub fn serve<T: BeaconChainTypes>(
.clone()
.and(warp::path("finality_checkpoints"))
.and(warp::path::end())
.and_then(
.then(
|state_id: StateId,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>| {
@@ -627,7 +627,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(warp::path("validator_balances"))
.and(warp::path::end())
.and(multi_key_query::<api_types::ValidatorBalancesQuery>())
.and_then(
.then(
|state_id: StateId,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
@@ -685,7 +685,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(warp::path("validators"))
.and(warp::path::end())
.and(multi_key_query::<api_types::ValidatorsQuery>())
.and_then(
.then(
|state_id: StateId,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
@@ -769,7 +769,7 @@ pub fn serve<T: BeaconChainTypes>(
))
}))
.and(warp::path::end())
.and_then(
.then(
|state_id: StateId,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
@@ -837,7 +837,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(warp::path("committees"))
.and(warp::query::<api_types::CommitteesQuery>())
.and(warp::path::end())
.and_then(
.then(
|state_id: StateId,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
@@ -1020,7 +1020,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(warp::path("sync_committees"))
.and(warp::query::<api_types::SyncCommitteesQuery>())
.and(warp::path::end())
.and_then(
.then(
|state_id: StateId,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
@@ -1086,7 +1086,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(warp::path("randao"))
.and(warp::query::<api_types::RandaoQuery>())
.and(warp::path::end())
.and_then(
.then(
|state_id: StateId,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
@@ -1128,7 +1128,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(warp::path::end())
.and(task_spawner_filter.clone())
.and(chain_filter.clone())
.and_then(
.then(
|query: api_types::HeadersQuery,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>| {
@@ -1228,7 +1228,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(warp::path::end())
.and(task_spawner_filter.clone())
.and(chain_filter.clone())
.and_then(
.then(
|block_id: BlockId,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>| {
@@ -1276,7 +1276,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(chain_filter.clone())
.and(network_tx_filter.clone())
.and(log_filter.clone())
.and_then(
.then(
|block_contents: SignedBlockContents<T::EthSpec>,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
@@ -1302,33 +1302,35 @@ pub fn serve<T: BeaconChainTypes>(
.and(warp::path("blocks"))
.and(warp::path::end())
.and(warp::body::bytes())
.and(task_spawner_filter.clone())
.and(chain_filter.clone())
.and(network_tx_filter.clone())
.and(log_filter.clone())
.and_then(
.then(
|block_bytes: Bytes,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>,
log: Logger| async move {
let block_contents = match SignedBlockContents::<T::EthSpec>::from_ssz_bytes(
&block_bytes,
&chain.spec,
) {
Ok(data) => data,
Err(e) => {
return Err(warp_utils::reject::custom_bad_request(format!("{:?}", e)))
}
};
publish_blocks::publish_block(
None,
ProvenancedBlock::local(block_contents),
chain,
&network_tx,
log,
BroadcastValidation::default(),
)
.await
.map(|()| warp::reply().into_response())
log: Logger| {
task_spawner.spawn_async_with_rejection(Priority::P0, async move {
let block_contents = SignedBlockContents::<T::EthSpec>::from_ssz_bytes(
&block_bytes,
&chain.spec,
)
.map_err(|e| {
warp_utils::reject::custom_bad_request(format!("invalid SSZ: {e:?}"))
})?;
publish_blocks::publish_block(
None,
ProvenancedBlock::local(block_contents),
chain,
&network_tx,
log,
BroadcastValidation::default(),
)
.await
.map(|()| warp::reply().into_response())
})
},
);
@@ -1349,8 +1351,8 @@ pub fn serve<T: BeaconChainTypes>(
chain: Arc<BeaconChain<T>>,
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>,
log: Logger| {
task_spawner.spawn_async(Priority::P1, async move {
match publish_blocks::publish_block(
task_spawner.spawn_async_with_rejection(Priority::P0, async move {
publish_blocks::publish_block(
None,
ProvenancedBlock::local(block_contents),
chain,
@@ -1359,17 +1361,7 @@ pub fn serve<T: BeaconChainTypes>(
validation_level.broadcast_validation,
)
.await
{
Ok(()) => warp::reply().into_response(),
Err(e) => match warp_utils::reject::handle_rejection(e).await {
Ok(reply) => reply.into_response(),
Err(_) => warp::reply::with_status(
StatusCode::INTERNAL_SERVER_ERROR,
eth2::StatusCode::INTERNAL_SERVER_ERROR,
)
.into_response(),
},
}
.map(|()| warp::reply().into_response())
})
},
);
@@ -1380,53 +1372,41 @@ pub fn serve<T: BeaconChainTypes>(
.and(warp::query::<api_types::BroadcastValidationQuery>())
.and(warp::path::end())
.and(warp::body::bytes())
.and(task_spawner_filter.clone())
.and(chain_filter.clone())
.and(network_tx_filter.clone())
.and(log_filter.clone())
.then(
|validation_level: api_types::BroadcastValidationQuery,
block_bytes: Bytes,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>,
log: Logger| async move {
let block_contents = match SignedBlockContents::<T::EthSpec>::from_ssz_bytes(
&block_bytes,
&chain.spec,
) {
Ok(data) => data,
Err(_) => {
return warp::reply::with_status(
StatusCode::BAD_REQUEST,
eth2::StatusCode::BAD_REQUEST,
)
.into_response();
}
};
match publish_blocks::publish_block(
None,
ProvenancedBlock::local(block_contents),
chain,
&network_tx,
log,
validation_level.broadcast_validation,
)
.await
{
Ok(()) => warp::reply().into_response(),
Err(e) => match warp_utils::reject::handle_rejection(e).await {
Ok(reply) => reply.into_response(),
Err(_) => warp::reply::with_status(
StatusCode::INTERNAL_SERVER_ERROR,
eth2::StatusCode::INTERNAL_SERVER_ERROR,
)
.into_response(),
},
}
log: Logger| {
task_spawner.spawn_async_with_rejection(Priority::P0, async move {
let block_contents = SignedBlockContents::<T::EthSpec>::from_ssz_bytes(
&block_bytes,
&chain.spec,
)
.map_err(|e| {
warp_utils::reject::custom_bad_request(format!("invalid SSZ: {e:?}"))
})?;
publish_blocks::publish_block(
None,
ProvenancedBlock::local(block_contents),
chain,
&network_tx,
log,
validation_level.broadcast_validation,
)
.await
.map(|()| warp::reply().into_response())
})
},
);
/*
* beacon/blocks
* beacon/blinded_blocks
*/
// POST beacon/blinded_blocks
@@ -1439,7 +1419,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(chain_filter.clone())
.and(network_tx_filter.clone())
.and(log_filter.clone())
.and_then(
.then(
|block_contents: SignedBlockContents<T::EthSpec, BlindedPayload<_>>,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
@@ -1465,33 +1445,35 @@ pub fn serve<T: BeaconChainTypes>(
.and(warp::path("blinded_blocks"))
.and(warp::path::end())
.and(warp::body::bytes())
.and(task_spawner_filter.clone())
.and(chain_filter.clone())
.and(network_tx_filter.clone())
.and(log_filter.clone())
.and_then(
.then(
|block_bytes: Bytes,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>,
log: Logger| async move {
let block =
match SignedBlockContents::<T::EthSpec, BlindedPayload<_>>::from_ssz_bytes(
&block_bytes,
&chain.spec,
) {
Ok(data) => data,
Err(e) => {
return Err(warp_utils::reject::custom_bad_request(format!("{:?}", e)))
}
};
publish_blocks::publish_blinded_block(
block,
chain,
&network_tx,
log,
BroadcastValidation::default(),
)
.await
.map(|()| warp::reply().into_response())
log: Logger| {
task_spawner.spawn_async_with_rejection(Priority::P0, async move {
let block =
SignedBlockContents::<T::EthSpec, BlindedPayload<_>>::from_ssz_bytes(
&block_bytes,
&chain.spec,
)
.map_err(|e| {
warp_utils::reject::custom_bad_request(format!("invalid SSZ: {e:?}"))
})?;
publish_blocks::publish_blinded_block(
block,
chain,
&network_tx,
log,
BroadcastValidation::default(),
)
.await
.map(|()| warp::reply().into_response())
})
},
);
@@ -1612,7 +1594,7 @@ pub fn serve<T: BeaconChainTypes>(
.clone()
.and(warp::path::end())
.and(warp::header::optional::<api_types::Accept>("accept"))
.and_then(
.then(
|endpoint_version: EndpointVersion,
block_id: BlockId,
task_spawner: TaskSpawner<T::EthSpec>,
@@ -1655,7 +1637,7 @@ pub fn serve<T: BeaconChainTypes>(
.clone()
.and(warp::path("root"))
.and(warp::path::end())
.and_then(
.then(
|block_id: BlockId,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>| {
@@ -1675,7 +1657,7 @@ pub fn serve<T: BeaconChainTypes>(
.clone()
.and(warp::path("attestations"))
.and(warp::path::end())
.and_then(
.then(
|block_id: BlockId,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>| {
@@ -1699,7 +1681,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(chain_filter.clone())
.and(warp::path::end())
.and(warp::header::optional::<api_types::Accept>("accept"))
.and_then(
.then(
|block_id: BlockId,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
@@ -1798,7 +1780,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(warp::body::json())
.and(network_tx_filter.clone())
.and(log_filter.clone())
.and_then(
.then(
|task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
attestations: Vec<Attestation<T::EthSpec>>,
@@ -1940,7 +1922,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(warp::path("attestations"))
.and(warp::path::end())
.and(warp::query::<api_types::AttestationPoolQuery>())
.and_then(
.then(
|task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
query: api_types::AttestationPoolQuery| {
@@ -1973,7 +1955,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(warp::path::end())
.and(warp::body::json())
.and(network_tx_filter.clone())
.and_then(
.then(
|task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
slashing: AttesterSlashing<T::EthSpec>,
@@ -2015,7 +1997,7 @@ pub fn serve<T: BeaconChainTypes>(
.clone()
.and(warp::path("attester_slashings"))
.and(warp::path::end())
.and_then(
.then(
|task_spawner: TaskSpawner<T::EthSpec>, chain: Arc<BeaconChain<T>>| {
task_spawner.blocking_json_task(Priority::P1, move || {
let attestations = chain.op_pool.get_all_attester_slashings();
@@ -2031,7 +2013,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(warp::path::end())
.and(warp::body::json())
.and(network_tx_filter.clone())
.and_then(
.then(
|task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
slashing: ProposerSlashing,
@@ -2073,7 +2055,7 @@ pub fn serve<T: BeaconChainTypes>(
.clone()
.and(warp::path("proposer_slashings"))
.and(warp::path::end())
.and_then(
.then(
|task_spawner: TaskSpawner<T::EthSpec>, chain: Arc<BeaconChain<T>>| {
task_spawner.blocking_json_task(Priority::P1, move || {
let attestations = chain.op_pool.get_all_proposer_slashings();
@@ -2089,7 +2071,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(warp::path::end())
.and(warp::body::json())
.and(network_tx_filter.clone())
.and_then(
.then(
|task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
exit: SignedVoluntaryExit,
@@ -2129,7 +2111,7 @@ pub fn serve<T: BeaconChainTypes>(
.clone()
.and(warp::path("voluntary_exits"))
.and(warp::path::end())
.and_then(
.then(
|task_spawner: TaskSpawner<T::EthSpec>, chain: Arc<BeaconChain<T>>| {
task_spawner.blocking_json_task(Priority::P1, move || {
let attestations = chain.op_pool.get_all_voluntary_exits();
@@ -2146,7 +2128,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(warp::body::json())
.and(network_tx_filter.clone())
.and(log_filter.clone())
.and_then(
.then(
|task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
signatures: Vec<SyncCommitteeMessage>,
@@ -2166,7 +2148,7 @@ pub fn serve<T: BeaconChainTypes>(
.clone()
.and(warp::path("bls_to_execution_changes"))
.and(warp::path::end())
.and_then(
.then(
|task_spawner: TaskSpawner<T::EthSpec>, chain: Arc<BeaconChain<T>>| {
task_spawner.blocking_json_task(Priority::P1, move || {
let address_changes = chain.op_pool.get_all_bls_to_execution_changes();
@@ -2183,7 +2165,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(warp::body::json())
.and(network_tx_filter.clone())
.and(log_filter.clone())
.and_then(
.then(
|task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
address_changes: Vec<SignedBlsToExecutionChange>,
@@ -2275,7 +2257,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(warp::header::optional::<api_types::Accept>("accept"))
.and(task_spawner_filter.clone())
.and(eth1_service_filter.clone())
.and_then(
.then(
|accept_header: Option<api_types::Accept>,
task_spawner: TaskSpawner<T::EthSpec>,
eth1_service: eth1::Service| {
@@ -2329,7 +2311,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(warp::path("blocks"))
.and(block_id_or_err)
.and(warp::path::end())
.and_then(
.then(
|task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
block_id: BlockId| {
@@ -2362,7 +2344,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(warp::path::param::<Epoch>())
.and(warp::path::end())
.and(warp::body::json())
.and_then(
.then(
|task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
epoch: Epoch,
@@ -2414,7 +2396,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(warp::path::end())
.and(warp::body::json())
.and(log_filter.clone())
.and_then(
.then(
|task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
block_id: BlockId,
@@ -2447,7 +2429,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(warp::path::end())
.and(task_spawner_filter.clone())
.and(chain_filter.clone())
.and_then(
.then(
|task_spawner: TaskSpawner<T::EthSpec>, chain: Arc<BeaconChain<T>>| {
task_spawner.blocking_json_task(Priority::P1, move || {
let forks = ForkName::list_all()
@@ -2466,7 +2448,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(warp::path::end())
.and(task_spawner_filter.clone())
.and(chain_filter.clone())
.and_then(
.then(
move |task_spawner: TaskSpawner<T::EthSpec>, chain: Arc<BeaconChain<T>>| {
task_spawner.blocking_json_task(Priority::P0, move || {
let config_and_preset =
@@ -2482,7 +2464,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(warp::path::end())
.and(task_spawner_filter.clone())
.and(chain_filter.clone())
.and_then(
.then(
|task_spawner: TaskSpawner<T::EthSpec>, chain: Arc<BeaconChain<T>>| {
task_spawner.blocking_json_task(Priority::P1, move || {
Ok(api_types::GenericResponse::from(
@@ -2513,7 +2495,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(warp::header::optional::<api_types::Accept>("accept"))
.and(task_spawner_filter.clone())
.and(chain_filter.clone())
.and_then(
.then(
|endpoint_version: EndpointVersion,
state_id: StateId,
accept_header: Option<api_types::Accept>,
@@ -2573,7 +2555,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(warp::path::end())
.and(task_spawner_filter.clone())
.and(chain_filter.clone())
.and_then(
.then(
|endpoint_version: EndpointVersion,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>| {
@@ -2612,7 +2594,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(warp::path::end())
.and(task_spawner_filter.clone())
.and(chain_filter.clone())
.and_then(
.then(
|task_spawner: TaskSpawner<T::EthSpec>, chain: Arc<BeaconChain<T>>| {
task_spawner.blocking_json_task(Priority::P1, move || {
let beacon_fork_choice = chain.canonical_head.fork_choice_read_lock();
@@ -2667,7 +2649,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(warp::path::end())
.and(task_spawner_filter.clone())
.and(network_globals.clone())
.and_then(
.then(
|task_spawner: TaskSpawner<T::EthSpec>,
network_globals: Arc<NetworkGlobals<T::EthSpec>>| {
task_spawner.blocking_json_task(Priority::P1, move || {
@@ -2723,7 +2705,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(task_spawner_filter.clone())
.and(network_globals.clone())
.and(chain_filter.clone())
.and_then(
.then(
|task_spawner: TaskSpawner<T::EthSpec>,
network_globals: Arc<NetworkGlobals<T::EthSpec>>,
chain: Arc<BeaconChain<T>>| {
@@ -2774,7 +2756,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(task_spawner_filter.clone())
.and(network_globals.clone())
.and(chain_filter.clone())
.and_then(
.then(
|task_spawner: TaskSpawner<T::EthSpec>,
network_globals: Arc<NetworkGlobals<T::EthSpec>>,
chain: Arc<BeaconChain<T>>| {
@@ -2822,7 +2804,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(warp::path::end())
.and(task_spawner_filter.clone())
.and(network_globals.clone())
.and_then(
.then(
|requested_peer_id: String,
task_spawner: TaskSpawner<T::EthSpec>,
network_globals: Arc<NetworkGlobals<T::EthSpec>>| {
@@ -2882,7 +2864,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(multi_key_query::<api_types::PeersQuery>())
.and(task_spawner_filter.clone())
.and(network_globals.clone())
.and_then(
.then(
|query_res: Result<api_types::PeersQuery, warp::Rejection>,
task_spawner: TaskSpawner<T::EthSpec>,
network_globals: Arc<NetworkGlobals<T::EthSpec>>| {
@@ -2952,7 +2934,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(warp::path::end())
.and(task_spawner_filter.clone())
.and(network_globals.clone())
.and_then(
.then(
|task_spawner: TaskSpawner<T::EthSpec>,
network_globals: Arc<NetworkGlobals<T::EthSpec>>| {
task_spawner.blocking_json_task(Priority::P1, move || {
@@ -3005,7 +2987,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(task_spawner_filter.clone())
.and(chain_filter.clone())
.and(log_filter.clone())
.and_then(
.then(
|epoch: Epoch,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
@@ -3031,7 +3013,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(task_spawner_filter.clone())
.and(chain_filter.clone())
.and(log_filter.clone())
.and_then(
.then(
|endpoint_version: EndpointVersion,
slot: Slot,
query: api_types::ValidatorBlocksQuery,
@@ -3103,7 +3085,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(warp::query::<api_types::ValidatorBlocksQuery>())
.and(task_spawner_filter.clone())
.and(chain_filter.clone())
.and_then(
.then(
|slot: Slot,
query: api_types::ValidatorBlocksQuery,
task_spawner: TaskSpawner<T::EthSpec>,
@@ -3166,7 +3148,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(not_while_syncing_filter.clone())
.and(task_spawner_filter.clone())
.and(chain_filter.clone())
.and_then(
.then(
|query: api_types::ValidatorAttestationDataQuery,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>| {
@@ -3201,7 +3183,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(not_while_syncing_filter.clone())
.and(task_spawner_filter.clone())
.and(chain_filter.clone())
.and_then(
.then(
|query: api_types::ValidatorAggregateAttestationQuery,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>| {
@@ -3242,7 +3224,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(warp::body::json())
.and(task_spawner_filter.clone())
.and(chain_filter.clone())
.and_then(
.then(
|epoch: Epoch,
indices: api_types::ValidatorIndexData,
task_spawner: TaskSpawner<T::EthSpec>,
@@ -3253,7 +3235,7 @@ pub fn serve<T: BeaconChainTypes>(
},
);
// POST validator/duties/sync
// POST validator/duties/sync/{epoch}
let post_validator_duties_sync = eth_v1
.and(warp::path("validator"))
.and(warp::path("duties"))
@@ -3268,7 +3250,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(warp::body::json())
.and(task_spawner_filter.clone())
.and(chain_filter.clone())
.and_then(
.then(
|epoch: Epoch,
indices: api_types::ValidatorIndexData,
task_spawner: TaskSpawner<T::EthSpec>,
@@ -3288,7 +3270,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(not_while_syncing_filter.clone())
.and(task_spawner_filter.clone())
.and(chain_filter.clone())
.and_then(
.then(
|sync_committee_data: SyncContributionData,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>| {
@@ -3322,7 +3304,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(warp::body::json())
.and(network_tx_filter.clone())
.and(log_filter.clone())
.and_then(
.then(
|task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
aggregates: Vec<SignedAggregateAndProof<T::EthSpec>>,
@@ -3435,7 +3417,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(warp::body::json())
.and(network_tx_filter)
.and(log_filter.clone())
.and_then(
.then(
|task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
contributions: Vec<SignedContributionAndProof<T::EthSpec>>,
@@ -3463,7 +3445,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(task_spawner_filter.clone())
.and(chain_filter.clone())
.and(log_filter.clone())
.and_then(
.then(
|subscriptions: Vec<api_types::BeaconCommitteeSubscription>,
validator_subscription_tx: Sender<ValidatorSubscriptionMessage>,
task_spawner: TaskSpawner<T::EthSpec>,
@@ -3515,7 +3497,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(chain_filter.clone())
.and(log_filter.clone())
.and(warp::body::json())
.and_then(
.then(
|task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
log: Logger,
@@ -3566,15 +3548,15 @@ pub fn serve<T: BeaconChainTypes>(
.and(chain_filter.clone())
.and(log_filter.clone())
.and(warp::body::json())
.and_then(
.then(
|task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
log: Logger,
register_val_data: Vec<SignedValidatorRegistrationData>| async {
let (tx, rx) = oneshot::channel();
task_spawner
.spawn_async_with_rejection(Priority::P0, async move {
let initial_result = task_spawner
.spawn_async_with_rejection_no_conversion(Priority::P0, async move {
let execution_layer = chain
.execution_layer
.as_ref()
@@ -3716,17 +3698,22 @@ pub fn serve<T: BeaconChainTypes>(
// from what is sent back down the channel.
Ok(warp::reply::reply().into_response())
})
.await?;
.await;
if initial_result.is_err() {
return task_spawner::convert_rejection(initial_result).await;
}
// Await a response from the builder without blocking a
// `BeaconProcessor` worker.
rx.await.unwrap_or_else(|_| {
task_spawner::convert_rejection(rx.await.unwrap_or_else(|_| {
Ok(warp::reply::with_status(
warp::reply::json(&"No response from channel"),
eth2::StatusCode::INTERNAL_SERVER_ERROR,
)
.into_response())
})
}))
.await
},
);
// POST validator/sync_committee_subscriptions
@@ -3739,7 +3726,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(task_spawner_filter.clone())
.and(chain_filter.clone())
.and(log_filter.clone())
.and_then(
.then(
|subscriptions: Vec<types::SyncCommitteeSubscription>,
validator_subscription_tx: Sender<ValidatorSubscriptionMessage>,
task_spawner: TaskSpawner<T::EthSpec>,
@@ -3783,7 +3770,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(warp::body::json())
.and(task_spawner_filter.clone())
.and(chain_filter.clone())
.and_then(
.then(
|epoch: Epoch,
indices: Vec<u64>,
task_spawner: TaskSpawner<T::EthSpec>,
@@ -3824,7 +3811,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(warp::body::json())
.and(task_spawner_filter.clone())
.and(chain_filter.clone())
.and_then(
.then(
|request_data: api_types::LivenessRequestData,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>| {
@@ -3868,7 +3855,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(warp::path("health"))
.and(warp::path::end())
.and(task_spawner_filter.clone())
.and_then(|task_spawner: TaskSpawner<T::EthSpec>| {
.then(|task_spawner: TaskSpawner<T::EthSpec>| {
task_spawner.blocking_json_task(Priority::P0, move || {
eth2::lighthouse::Health::observe()
.map(api_types::GenericResponse::from)
@@ -3886,7 +3873,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(app_start_filter)
.and(data_dir_filter)
.and(network_globals.clone())
.and_then(
.then(
|task_spawner: TaskSpawner<T::EthSpec>,
sysinfo,
app_start: std::time::Instant,
@@ -3911,7 +3898,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(warp::path::end())
.and(task_spawner_filter.clone())
.and(chain_filter.clone())
.and_then(
.then(
|task_spawner: TaskSpawner<T::EthSpec>, chain: Arc<BeaconChain<T>>| {
task_spawner.blocking_json_task(Priority::P1, move || {
ui::get_validator_count(chain).map(api_types::GenericResponse::from)
@@ -3927,7 +3914,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(warp::body::json())
.and(task_spawner_filter.clone())
.and(chain_filter.clone())
.and_then(
.then(
|request_data: ui::ValidatorMetricsRequestData,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>| {
@@ -3946,7 +3933,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(warp::body::json())
.and(task_spawner_filter.clone())
.and(chain_filter.clone())
.and_then(
.then(
|request_data: ui::ValidatorInfoRequestData,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>| {
@@ -3963,7 +3950,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(warp::path::end())
.and(task_spawner_filter.clone())
.and(network_globals.clone())
.and_then(
.then(
|task_spawner: TaskSpawner<T::EthSpec>,
network_globals: Arc<NetworkGlobals<T::EthSpec>>| {
task_spawner.blocking_json_task(Priority::P0, move || {
@@ -3979,7 +3966,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(warp::path("nat"))
.and(task_spawner_filter.clone())
.and(warp::path::end())
.and_then(|task_spawner: TaskSpawner<T::EthSpec>| {
.then(|task_spawner: TaskSpawner<T::EthSpec>| {
task_spawner.blocking_json_task(Priority::P1, move || {
Ok(api_types::GenericResponse::from(
lighthouse_network::metrics::NAT_OPEN
@@ -3997,7 +3984,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(warp::path::end())
.and(task_spawner_filter.clone())
.and(network_globals.clone())
.and_then(
.then(
|task_spawner: TaskSpawner<T::EthSpec>,
network_globals: Arc<NetworkGlobals<T::EthSpec>>| {
task_spawner.blocking_json_task(Priority::P1, move || {
@@ -4021,7 +4008,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(warp::path::end())
.and(task_spawner_filter.clone())
.and(network_globals)
.and_then(
.then(
|task_spawner: TaskSpawner<T::EthSpec>,
network_globals: Arc<NetworkGlobals<T::EthSpec>>| {
task_spawner.blocking_json_task(Priority::P1, move || {
@@ -4044,7 +4031,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(warp::path::end())
.and(task_spawner_filter.clone())
.and(chain_filter.clone())
.and_then(
.then(
|task_spawner: TaskSpawner<T::EthSpec>, chain: Arc<BeaconChain<T>>| {
task_spawner.blocking_response_task(Priority::P1, move || {
Ok::<_, warp::Rejection>(warp::reply::json(
@@ -4068,7 +4055,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(warp::path::end())
.and(task_spawner_filter.clone())
.and(chain_filter.clone())
.and_then(
.then(
|epoch: Epoch,
validator_id: ValidatorId,
task_spawner: TaskSpawner<T::EthSpec>,
@@ -4088,7 +4075,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(warp::path::end())
.and(task_spawner_filter.clone())
.and(chain_filter.clone())
.and_then(
.then(
|epoch: Epoch, task_spawner: TaskSpawner<T::EthSpec>, chain: Arc<BeaconChain<T>>| {
task_spawner.blocking_json_task(Priority::P1, move || {
validator_inclusion::global_validator_inclusion_data(epoch, &chain)
@@ -4104,7 +4091,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(warp::path::end())
.and(task_spawner_filter.clone())
.and(chain_filter.clone())
.and_then(
.then(
|task_spawner: TaskSpawner<T::EthSpec>, chain: Arc<BeaconChain<T>>| {
task_spawner.blocking_json_task(Priority::P1, move || {
let current_slot_opt = chain.slot().ok();
@@ -4137,7 +4124,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(warp::path::end())
.and(task_spawner_filter.clone())
.and(eth1_service_filter.clone())
.and_then(
.then(
|task_spawner: TaskSpawner<T::EthSpec>, eth1_service: eth1::Service| {
task_spawner.blocking_json_task(Priority::P1, move || {
Ok(api_types::GenericResponse::from(
@@ -4159,7 +4146,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(warp::path::end())
.and(task_spawner_filter.clone())
.and(eth1_service_filter)
.and_then(
.then(
|task_spawner: TaskSpawner<T::EthSpec>, eth1_service: eth1::Service| {
task_spawner.blocking_json_task(Priority::P1, move || {
Ok(api_types::GenericResponse::from(
@@ -4184,7 +4171,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(warp::path::end())
.and(task_spawner_filter.clone())
.and(chain_filter.clone())
.and_then(
.then(
|state_id: StateId,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>| {
@@ -4211,7 +4198,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(warp::path::end())
.and(task_spawner_filter.clone())
.and(chain_filter.clone())
.and_then(
.then(
|task_spawner: TaskSpawner<T::EthSpec>, chain: Arc<BeaconChain<T>>| {
task_spawner.blocking_json_task(Priority::P1, move || {
if chain.eth1_chain.is_some() {
@@ -4235,7 +4222,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(warp::path::end())
.and(task_spawner_filter.clone())
.and(chain_filter.clone())
.and_then(
.then(
|task_spawner: TaskSpawner<T::EthSpec>, chain: Arc<BeaconChain<T>>| {
task_spawner.blocking_json_task(Priority::P1, move || database::info(chain))
},
@@ -4248,7 +4235,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(not_while_syncing_filter)
.and(task_spawner_filter.clone())
.and(chain_filter.clone())
.and_then(
.then(
|task_spawner: TaskSpawner<T::EthSpec>, chain: Arc<BeaconChain<T>>| {
task_spawner.blocking_json_task(Priority::P1, move || {
chain.store_migrator.process_reconstruction();
@@ -4266,7 +4253,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(task_spawner_filter.clone())
.and(chain_filter.clone())
.and(log_filter.clone())
.and_then(|query, task_spawner: TaskSpawner<T::EthSpec>, chain, log| {
.then(|query, task_spawner: TaskSpawner<T::EthSpec>, chain, log| {
task_spawner.blocking_json_task(Priority::P1, move || {
block_rewards::get_block_rewards(query, chain, log)
})
@@ -4281,7 +4268,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(task_spawner_filter.clone())
.and(chain_filter.clone())
.and(log_filter.clone())
.and_then(
.then(
|blocks, task_spawner: TaskSpawner<T::EthSpec>, chain, log| {
task_spawner.blocking_json_task(Priority::P1, move || {
block_rewards::compute_block_rewards(blocks, chain, log)
@@ -4298,7 +4285,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(warp::path::end())
.and(task_spawner_filter.clone())
.and(chain_filter.clone())
.and_then(
.then(
|target, query, task_spawner: TaskSpawner<T::EthSpec>, chain: Arc<BeaconChain<T>>| {
task_spawner.blocking_json_task(Priority::P1, move || {
attestation_performance::get_attestation_performance(target, query, chain)
@@ -4314,7 +4301,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(warp::path::end())
.and(task_spawner_filter.clone())
.and(chain_filter.clone())
.and_then(
.then(
|query, task_spawner: TaskSpawner<T::EthSpec>, chain: Arc<BeaconChain<T>>| {
task_spawner.blocking_json_task(Priority::P1, move || {
block_packing_efficiency::get_block_packing_efficiency(query, chain)
@@ -4328,7 +4315,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(warp::path::end())
.and(task_spawner_filter.clone())
.and(chain_filter.clone())
.and_then(
.then(
|task_spawner: TaskSpawner<T::EthSpec>, chain: Arc<BeaconChain<T>>| {
task_spawner.spawn_async_with_rejection(Priority::P1, async move {
let merge_readiness = chain.check_merge_readiness().await;
@@ -4346,7 +4333,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(multi_key_query::<api_types::EventQuery>())
.and(task_spawner_filter.clone())
.and(chain_filter)
.and_then(
.then(
|topics_res: Result<api_types::EventQuery, warp::Rejection>,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>| {
@@ -4423,7 +4410,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(warp::path::end())
.and(task_spawner_filter)
.and(sse_component_filter)
.and_then(
.then(
|task_spawner: TaskSpawner<T::EthSpec>, sse_component: Option<SSELoggingComponents>| {
task_spawner.blocking_response_task(Priority::P1, move || {
if let Some(logging_components) = sse_component {

View File

@@ -35,6 +35,24 @@ pub struct TaskSpawner<E: EthSpec> {
beacon_processor_send: Option<BeaconProcessorSend<E>>,
}
/// Convert a warp `Rejection` into a `Response`.
///
/// This function should *always* be used to convert rejections into responses. This prevents warp
/// from trying to backtrack in strange ways. See: https://github.com/sigp/lighthouse/issues/3404
pub async fn convert_rejection<T: Reply>(res: Result<T, warp::Rejection>) -> Response {
match res {
Ok(response) => response.into_response(),
Err(e) => match warp_utils::reject::handle_rejection(e).await {
Ok(reply) => reply.into_response(),
Err(_) => warp::reply::with_status(
warp::reply::json(&"unhandled error"),
eth2::StatusCode::INTERNAL_SERVER_ERROR,
)
.into_response(),
},
}
}
impl<E: EthSpec> TaskSpawner<E> {
pub fn new(beacon_processor_send: Option<BeaconProcessorSend<E>>) -> Self {
Self {
@@ -43,11 +61,7 @@ impl<E: EthSpec> TaskSpawner<E> {
}
/// Executes a "blocking" (non-async) task which returns a `Response`.
pub async fn blocking_response_task<F, T>(
self,
priority: Priority,
func: F,
) -> Result<Response, warp::Rejection>
pub async fn blocking_response_task<F, T>(self, priority: Priority, func: F) -> Response
where
F: FnOnce() -> Result<T, warp::Rejection> + Send + Sync + 'static,
T: Reply + Send + 'static,
@@ -65,31 +79,25 @@ impl<E: EthSpec> TaskSpawner<E> {
};
// Send the function to the beacon processor for execution at some arbitrary time.
match send_to_beacon_processor(
let result = send_to_beacon_processor(
beacon_processor_send,
priority,
BlockingOrAsync::Blocking(Box::new(process_fn)),
rx,
)
.await
{
Ok(result) => result.map(Reply::into_response),
Err(error_response) => Ok(error_response),
}
.and_then(|x| x);
convert_rejection(result).await
} else {
// There is no beacon processor so spawn a task directly on the
// tokio executor.
warp_utils::task::blocking_response_task(func).await
convert_rejection(warp_utils::task::blocking_response_task(func).await).await
}
}
/// Executes a "blocking" (non-async) task which returns a JSON-serializable
/// object.
pub async fn blocking_json_task<F, T>(
self,
priority: Priority,
func: F,
) -> Result<Response, warp::Rejection>
pub async fn blocking_json_task<F, T>(self, priority: Priority, func: F) -> Response
where
F: FnOnce() -> Result<T, warp::Rejection> + Send + Sync + 'static,
T: Serialize + Send + 'static,
@@ -98,11 +106,26 @@ impl<E: EthSpec> TaskSpawner<E> {
self.blocking_response_task(priority, func).await
}
/// Executes an async task which may return a `warp::Rejection`.
/// Executes an async task which may return a `Rejection`, which will be converted to a response.
pub async fn spawn_async_with_rejection(
self,
priority: Priority,
func: impl Future<Output = Result<Response, warp::Rejection>> + Send + Sync + 'static,
) -> Response {
let result = self
.spawn_async_with_rejection_no_conversion(priority, func)
.await;
convert_rejection(result).await
}
/// Same as `spawn_async_with_rejection` but returning a result with the unhandled rejection.
///
/// If you call this function you MUST convert the rejection to a response and not let it
/// propagate into Warp's filters. See `convert_rejection`.
pub async fn spawn_async_with_rejection_no_conversion(
self,
priority: Priority,
func: impl Future<Output = Result<Response, warp::Rejection>> + Send + Sync + 'static,
) -> Result<Response, warp::Rejection> {
if let Some(beacon_processor_send) = &self.beacon_processor_send {
// Create a wrapper future that will execute `func` and send the
@@ -124,18 +147,16 @@ impl<E: EthSpec> TaskSpawner<E> {
rx,
)
.await
.unwrap_or_else(Result::Ok)
.and_then(|x| x)
} else {
// There is no beacon processor so spawn a task directly on the
// tokio executor.
tokio::task::spawn(func).await.unwrap_or_else(|e| {
let response = warp::reply::with_status(
warp::reply::json(&format!("Tokio did not execute task: {e:?}")),
eth2::StatusCode::INTERNAL_SERVER_ERROR,
)
.into_response();
Ok(response)
})
tokio::task::spawn(func)
.await
.map_err(|_| {
warp_utils::reject::custom_server_error("Tokio failed to spawn task".into())
})
.and_then(|x| x)
}
}
@@ -158,14 +179,14 @@ impl<E: EthSpec> TaskSpawner<E> {
};
// Send the function to the beacon processor for execution at some arbitrary time.
send_to_beacon_processor(
let result = send_to_beacon_processor(
beacon_processor_send,
priority,
BlockingOrAsync::Async(Box::pin(process_fn)),
rx,
)
.await
.unwrap_or_else(|error_response| error_response)
.await;
convert_rejection(result).await
} else {
// There is no beacon processor so spawn a task directly on the
// tokio executor.
@@ -182,14 +203,14 @@ impl<E: EthSpec> TaskSpawner<E> {
/// Send a task to the beacon processor and await execution.
///
/// If the task is not executed, return an `Err(response)` with an error message
/// If the task is not executed, return an `Err` with an error message
/// for the API consumer.
async fn send_to_beacon_processor<E: EthSpec, T>(
beacon_processor_send: &BeaconProcessorSend<E>,
priority: Priority,
process_fn: BlockingOrAsync,
rx: oneshot::Receiver<T>,
) -> Result<T, Response> {
) -> Result<T, warp::Rejection> {
let error_message = match beacon_processor_send.try_send(priority.work_event(process_fn)) {
Ok(()) => {
match rx.await {
@@ -205,10 +226,7 @@ async fn send_to_beacon_processor<E: EthSpec, T>(
Err(TrySendError::Closed(_)) => "The task was dropped. The server is shutting down.",
};
let error_response = warp::reply::with_status(
warp::reply::json(&error_message),
eth2::StatusCode::INTERNAL_SERVER_ERROR,
)
.into_response();
Err(error_response)
Err(warp_utils::reject::custom_server_error(
error_message.to_string(),
))
}