Start wiring task spawner into routes

This commit is contained in:
Paul Hauner
2023-07-04 15:50:49 +10:00
parent be7f32f5eb
commit c5f159ed6f
2 changed files with 163 additions and 111 deletions

View File

@@ -58,6 +58,7 @@ use std::pin::Pin;
use std::sync::Arc;
use sysinfo::{System, SystemExt};
use system_health::observe_system_health_bn;
use task_spawner::{Priority, TaskSpawner};
use tokio::sync::mpsc::{Sender, UnboundedSender};
use tokio_stream::{wrappers::BroadcastStream, StreamExt};
use types::{
@@ -490,6 +491,11 @@ pub fn serve<T: BeaconChainTypes>(
let app_start = std::time::Instant::now();
let app_start_filter = warp::any().map(move || app_start);
// Create a `warp` filter that provides access the `TaskSpawner`.
let beacon_processor_send = ctx.beacon_processor_send.clone();
let task_spawner_filter =
warp::any().map(move || TaskSpawner::new(beacon_processor_send.clone()));
/*
*
* Start of HTTP method definitions.
@@ -501,17 +507,20 @@ pub fn serve<T: BeaconChainTypes>(
.and(warp::path("beacon"))
.and(warp::path("genesis"))
.and(warp::path::end())
.and(task_spawner_filter.clone())
.and(chain_filter.clone())
.and_then(|chain: Arc<BeaconChain<T>>| {
blocking_json_task(move || {
let genesis_data = api_types::GenesisData {
genesis_time: chain.genesis_time,
genesis_validators_root: chain.genesis_validators_root,
genesis_fork_version: chain.spec.genesis_fork_version,
};
Ok(api_types::GenericResponse::from(genesis_data))
})
});
.and_then(
|task_spawner: TaskSpawner<T::EthSpec>, chain: Arc<BeaconChain<T>>| {
task_spawner.blocking_json_task(Priority::P1, move || {
let genesis_data = api_types::GenesisData {
genesis_time: chain.genesis_time,
genesis_validators_root: chain.genesis_validators_root,
genesis_fork_version: chain.spec.genesis_fork_version,
};
Ok(api_types::GenericResponse::from(genesis_data))
})
},
);
/*
* beacon/states/{state_id}
@@ -525,6 +534,7 @@ pub fn serve<T: BeaconChainTypes>(
"Invalid state ID".to_string(),
))
}))
.and(task_spawner_filter.clone())
.and(chain_filter.clone());
// GET beacon/states/{state_id}/root
@@ -532,65 +542,77 @@ pub fn serve<T: BeaconChainTypes>(
.clone()
.and(warp::path("root"))
.and(warp::path::end())
.and_then(|state_id: StateId, chain: Arc<BeaconChain<T>>| {
blocking_json_task(move || {
let (root, execution_optimistic, finalized) = state_id.root(&chain)?;
Ok(root)
.map(api_types::RootData::from)
.map(api_types::GenericResponse::from)
.map(|resp| {
resp.add_execution_optimistic_finalized(execution_optimistic, finalized)
})
})
});
.and_then(
|state_id: StateId,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>| {
task_spawner.blocking_json_task(Priority::P1, move || {
let (root, execution_optimistic, finalized) = state_id.root(&chain)?;
Ok(root)
.map(api_types::RootData::from)
.map(api_types::GenericResponse::from)
.map(|resp| {
resp.add_execution_optimistic_finalized(execution_optimistic, finalized)
})
})
},
);
// GET beacon/states/{state_id}/fork
let get_beacon_state_fork = beacon_states_path
.clone()
.and(warp::path("fork"))
.and(warp::path::end())
.and_then(|state_id: StateId, chain: Arc<BeaconChain<T>>| {
blocking_json_task(move || {
let (fork, execution_optimistic, finalized) =
state_id.fork_and_execution_optimistic_and_finalized(&chain)?;
Ok(api_types::ExecutionOptimisticFinalizedResponse {
data: fork,
execution_optimistic: Some(execution_optimistic),
finalized: Some(finalized),
.and_then(
|state_id: StateId,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>| {
task_spawner.blocking_json_task(Priority::P1, move || {
let (fork, execution_optimistic, finalized) =
state_id.fork_and_execution_optimistic_and_finalized(&chain)?;
Ok(api_types::ExecutionOptimisticFinalizedResponse {
data: fork,
execution_optimistic: Some(execution_optimistic),
finalized: Some(finalized),
})
})
})
});
},
);
// GET beacon/states/{state_id}/finality_checkpoints
let get_beacon_state_finality_checkpoints = beacon_states_path
.clone()
.and(warp::path("finality_checkpoints"))
.and(warp::path::end())
.and_then(|state_id: StateId, chain: Arc<BeaconChain<T>>| {
blocking_json_task(move || {
let (data, execution_optimistic, finalized) = state_id
.map_state_and_execution_optimistic_and_finalized(
&chain,
|state, execution_optimistic, finalized| {
Ok((
api_types::FinalityCheckpointsData {
previous_justified: state.previous_justified_checkpoint(),
current_justified: state.current_justified_checkpoint(),
finalized: state.finalized_checkpoint(),
},
execution_optimistic,
finalized,
))
},
)?;
.and_then(
|state_id: StateId,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>| {
task_spawner.blocking_json_task(Priority::P1, move || {
let (data, execution_optimistic, finalized) = state_id
.map_state_and_execution_optimistic_and_finalized(
&chain,
|state, execution_optimistic, finalized| {
Ok((
api_types::FinalityCheckpointsData {
previous_justified: state.previous_justified_checkpoint(),
current_justified: state.current_justified_checkpoint(),
finalized: state.finalized_checkpoint(),
},
execution_optimistic,
finalized,
))
},
)?;
Ok(api_types::ExecutionOptimisticFinalizedResponse {
data,
execution_optimistic: Some(execution_optimistic),
finalized: Some(finalized),
Ok(api_types::ExecutionOptimisticFinalizedResponse {
data,
execution_optimistic: Some(execution_optimistic),
finalized: Some(finalized),
})
})
})
});
},
);
// GET beacon/states/{state_id}/validator_balances?id
let get_beacon_state_validator_balances = beacon_states_path
@@ -600,9 +622,10 @@ pub fn serve<T: BeaconChainTypes>(
.and(multi_key_query::<api_types::ValidatorBalancesQuery>())
.and_then(
|state_id: StateId,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
query_res: Result<api_types::ValidatorBalancesQuery, warp::Rejection>| {
blocking_json_task(move || {
task_spawner.blocking_json_task(Priority::P1, move || {
let query = query_res?;
let (data, execution_optimistic, finalized) = state_id
.map_state_and_execution_optimistic_and_finalized(
@@ -657,9 +680,10 @@ pub fn serve<T: BeaconChainTypes>(
.and(multi_key_query::<api_types::ValidatorsQuery>())
.and_then(
|state_id: StateId,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
query_res: Result<api_types::ValidatorsQuery, warp::Rejection>| {
blocking_json_task(move || {
task_spawner.blocking_json_task(Priority::P1, move || {
let query = query_res?;
let (data, execution_optimistic, finalized) = state_id
.map_state_and_execution_optimistic_and_finalized(
@@ -739,8 +763,11 @@ pub fn serve<T: BeaconChainTypes>(
}))
.and(warp::path::end())
.and_then(
|state_id: StateId, chain: Arc<BeaconChain<T>>, validator_id: ValidatorId| {
blocking_json_task(move || {
|state_id: StateId,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
validator_id: ValidatorId| {
task_spawner.blocking_json_task(Priority::P1, move || {
let (data, execution_optimistic, finalized) = state_id
.map_state_and_execution_optimistic_and_finalized(
&chain,
@@ -799,8 +826,11 @@ pub fn serve<T: BeaconChainTypes>(
.and(warp::query::<api_types::CommitteesQuery>())
.and(warp::path::end())
.and_then(
|state_id: StateId, chain: Arc<BeaconChain<T>>, query: api_types::CommitteesQuery| {
blocking_json_task(move || {
|state_id: StateId,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
query: api_types::CommitteesQuery| {
task_spawner.blocking_json_task(Priority::P1, move || {
let (data, execution_optimistic, finalized) = state_id
.map_state_and_execution_optimistic_and_finalized(
&chain,
@@ -980,9 +1010,10 @@ pub fn serve<T: BeaconChainTypes>(
.and(warp::path::end())
.and_then(
|state_id: StateId,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
query: api_types::SyncCommitteesQuery| {
blocking_json_task(move || {
task_spawner.blocking_json_task(Priority::P1, move || {
let (sync_committee, execution_optimistic, finalized) = state_id
.map_state_and_execution_optimistic_and_finalized(
&chain,
@@ -1044,8 +1075,11 @@ pub fn serve<T: BeaconChainTypes>(
.and(warp::query::<api_types::RandaoQuery>())
.and(warp::path::end())
.and_then(
|state_id: StateId, chain: Arc<BeaconChain<T>>, query: api_types::RandaoQuery| {
blocking_json_task(move || {
|state_id: StateId,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
query: api_types::RandaoQuery| {
task_spawner.blocking_json_task(Priority::P1, move || {
let (randao, execution_optimistic, finalized) = state_id
.map_state_and_execution_optimistic_and_finalized(
&chain,
@@ -1080,10 +1114,13 @@ pub fn serve<T: BeaconChainTypes>(
.and(warp::path("headers"))
.and(warp::query::<api_types::HeadersQuery>())
.and(warp::path::end())
.and(task_spawner_filter.clone())
.and(chain_filter.clone())
.and_then(
|query: api_types::HeadersQuery, chain: Arc<BeaconChain<T>>| {
blocking_json_task(move || {
|query: api_types::HeadersQuery,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>| {
task_spawner.blocking_json_task(Priority::P1, move || {
let (root, block, execution_optimistic, finalized) =
match (query.slot, query.parent_root) {
// No query parameters, return the canonical head block.
@@ -1177,36 +1214,41 @@ pub fn serve<T: BeaconChainTypes>(
))
}))
.and(warp::path::end())
.and(task_spawner_filter.clone())
.and(chain_filter.clone())
.and_then(|block_id: BlockId, chain: Arc<BeaconChain<T>>| {
blocking_json_task(move || {
let (root, execution_optimistic, finalized) = block_id.root(&chain)?;
// Ignore the second `execution_optimistic` since the first one has more
// information about the original request.
let (block, _execution_optimistic, _finalized) =
BlockId::from_root(root).blinded_block(&chain)?;
.and_then(
|block_id: BlockId,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>| {
task_spawner.blocking_json_task(Priority::P1, move || {
let (root, execution_optimistic, finalized) = block_id.root(&chain)?;
// Ignore the second `execution_optimistic` since the first one has more
// information about the original request.
let (block, _execution_optimistic, _finalized) =
BlockId::from_root(root).blinded_block(&chain)?;
let canonical = chain
.block_root_at_slot(block.slot(), WhenSlotSkipped::None)
.map_err(warp_utils::reject::beacon_chain_error)?
.map_or(false, |canonical| root == canonical);
let canonical = chain
.block_root_at_slot(block.slot(), WhenSlotSkipped::None)
.map_err(warp_utils::reject::beacon_chain_error)?
.map_or(false, |canonical| root == canonical);
let data = api_types::BlockHeaderData {
root,
canonical,
header: api_types::BlockHeaderAndSignature {
message: block.message().block_header(),
signature: block.signature().clone().into(),
},
};
let data = api_types::BlockHeaderData {
root,
canonical,
header: api_types::BlockHeaderAndSignature {
message: block.message().block_header(),
signature: block.signature().clone().into(),
},
};
Ok(api_types::ExecutionOptimisticFinalizedResponse {
execution_optimistic: Some(execution_optimistic),
finalized: Some(finalized),
data,
Ok(api_types::ExecutionOptimisticFinalizedResponse {
execution_optimistic: Some(execution_optimistic),
finalized: Some(finalized),
data,
})
})
})
});
},
);
/*
* beacon/blocks
@@ -1218,24 +1260,28 @@ pub fn serve<T: BeaconChainTypes>(
.and(warp::path("blocks"))
.and(warp::path::end())
.and(warp::body::json())
.and(task_spawner_filter.clone())
.and(chain_filter.clone())
.and(network_tx_filter.clone())
.and(log_filter.clone())
.and_then(
|block: Arc<SignedBeaconBlock<T::EthSpec>>,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>,
log: Logger| async move {
publish_blocks::publish_block(
None,
ProvenancedBlock::local(block),
chain,
&network_tx,
log,
BroadcastValidation::default(),
)
.await
.map(|()| warp::reply().into_response())
log: Logger| {
task_spawner.async_task(Priority::P1, async move {
publish_blocks::publish_block(
None,
ProvenancedBlock::local(block),
chain,
&network_tx,
log,
BroadcastValidation::default(),
)
.await
.map(|()| warp::reply().into_response())
})
},
);

View File

@@ -8,6 +8,20 @@ use warp::reply::{Reply, Response};
#[derive(Clone, Copy)]
pub enum Priority {
P0,
P1,
}
impl Priority {
fn work_event<E: EthSpec>(&self, process_fn: BlockingOrAsync) -> WorkEvent<E> {
let work = match self {
Priority::P0 => Work::ApiRequestP0(process_fn),
Priority::P1 => unimplemented!("P1"),
};
WorkEvent {
drop_during_sync: false,
work,
}
}
}
pub struct TaskSpawner<E: EthSpec> {
@@ -22,7 +36,7 @@ impl<E: EthSpec> TaskSpawner<E> {
}
pub async fn blocking_json_task<F, T>(
&self,
self,
priority: Priority,
func: F,
) -> Result<Response, warp::Rejection>
@@ -112,15 +126,7 @@ async fn send_to_beacon_processor<E: EthSpec, T>(
where
T: Serialize + Send + 'static,
{
let work = match priority {
Priority::P0 => Work::ApiRequestP0(process_fn),
};
let work_event = WorkEvent {
drop_during_sync: false,
work,
};
let error_message = match beacon_processor_send.try_send(work_event) {
let error_message = match beacon_processor_send.try_send(priority.work_event(process_fn)) {
Ok(()) => {
match rx.await {
// The beacon processor executed the task and sent a result.