diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index 4f4a57be29..56ba86bb16 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -58,6 +58,7 @@ use std::pin::Pin; use std::sync::Arc; use sysinfo::{System, SystemExt}; use system_health::observe_system_health_bn; +use task_spawner::{Priority, TaskSpawner}; use tokio::sync::mpsc::{Sender, UnboundedSender}; use tokio_stream::{wrappers::BroadcastStream, StreamExt}; use types::{ @@ -490,6 +491,11 @@ pub fn serve( let app_start = std::time::Instant::now(); let app_start_filter = warp::any().map(move || app_start); + // Create a `warp` filter that provides access the `TaskSpawner`. + let beacon_processor_send = ctx.beacon_processor_send.clone(); + let task_spawner_filter = + warp::any().map(move || TaskSpawner::new(beacon_processor_send.clone())); + /* * * Start of HTTP method definitions. @@ -501,17 +507,20 @@ pub fn serve( .and(warp::path("beacon")) .and(warp::path("genesis")) .and(warp::path::end()) + .and(task_spawner_filter.clone()) .and(chain_filter.clone()) - .and_then(|chain: Arc>| { - blocking_json_task(move || { - let genesis_data = api_types::GenesisData { - genesis_time: chain.genesis_time, - genesis_validators_root: chain.genesis_validators_root, - genesis_fork_version: chain.spec.genesis_fork_version, - }; - Ok(api_types::GenericResponse::from(genesis_data)) - }) - }); + .and_then( + |task_spawner: TaskSpawner, chain: Arc>| { + task_spawner.blocking_json_task(Priority::P1, move || { + let genesis_data = api_types::GenesisData { + genesis_time: chain.genesis_time, + genesis_validators_root: chain.genesis_validators_root, + genesis_fork_version: chain.spec.genesis_fork_version, + }; + Ok(api_types::GenericResponse::from(genesis_data)) + }) + }, + ); /* * beacon/states/{state_id} @@ -525,6 +534,7 @@ pub fn serve( "Invalid state ID".to_string(), )) })) + .and(task_spawner_filter.clone()) .and(chain_filter.clone()); // GET beacon/states/{state_id}/root @@ -532,65 +542,77 @@ pub fn serve( .clone() .and(warp::path("root")) .and(warp::path::end()) - .and_then(|state_id: StateId, chain: Arc>| { - blocking_json_task(move || { - let (root, execution_optimistic, finalized) = state_id.root(&chain)?; - Ok(root) - .map(api_types::RootData::from) - .map(api_types::GenericResponse::from) - .map(|resp| { - resp.add_execution_optimistic_finalized(execution_optimistic, finalized) - }) - }) - }); + .and_then( + |state_id: StateId, + task_spawner: TaskSpawner, + chain: Arc>| { + task_spawner.blocking_json_task(Priority::P1, move || { + let (root, execution_optimistic, finalized) = state_id.root(&chain)?; + Ok(root) + .map(api_types::RootData::from) + .map(api_types::GenericResponse::from) + .map(|resp| { + resp.add_execution_optimistic_finalized(execution_optimistic, finalized) + }) + }) + }, + ); // GET beacon/states/{state_id}/fork let get_beacon_state_fork = beacon_states_path .clone() .and(warp::path("fork")) .and(warp::path::end()) - .and_then(|state_id: StateId, chain: Arc>| { - blocking_json_task(move || { - let (fork, execution_optimistic, finalized) = - state_id.fork_and_execution_optimistic_and_finalized(&chain)?; - Ok(api_types::ExecutionOptimisticFinalizedResponse { - data: fork, - execution_optimistic: Some(execution_optimistic), - finalized: Some(finalized), + .and_then( + |state_id: StateId, + task_spawner: TaskSpawner, + chain: Arc>| { + task_spawner.blocking_json_task(Priority::P1, move || { + let (fork, execution_optimistic, finalized) = + state_id.fork_and_execution_optimistic_and_finalized(&chain)?; + Ok(api_types::ExecutionOptimisticFinalizedResponse { + data: fork, + execution_optimistic: Some(execution_optimistic), + finalized: Some(finalized), + }) }) - }) - }); + }, + ); // GET beacon/states/{state_id}/finality_checkpoints let get_beacon_state_finality_checkpoints = beacon_states_path .clone() .and(warp::path("finality_checkpoints")) .and(warp::path::end()) - .and_then(|state_id: StateId, chain: Arc>| { - blocking_json_task(move || { - let (data, execution_optimistic, finalized) = state_id - .map_state_and_execution_optimistic_and_finalized( - &chain, - |state, execution_optimistic, finalized| { - Ok(( - api_types::FinalityCheckpointsData { - previous_justified: state.previous_justified_checkpoint(), - current_justified: state.current_justified_checkpoint(), - finalized: state.finalized_checkpoint(), - }, - execution_optimistic, - finalized, - )) - }, - )?; + .and_then( + |state_id: StateId, + task_spawner: TaskSpawner, + chain: Arc>| { + task_spawner.blocking_json_task(Priority::P1, move || { + let (data, execution_optimistic, finalized) = state_id + .map_state_and_execution_optimistic_and_finalized( + &chain, + |state, execution_optimistic, finalized| { + Ok(( + api_types::FinalityCheckpointsData { + previous_justified: state.previous_justified_checkpoint(), + current_justified: state.current_justified_checkpoint(), + finalized: state.finalized_checkpoint(), + }, + execution_optimistic, + finalized, + )) + }, + )?; - Ok(api_types::ExecutionOptimisticFinalizedResponse { - data, - execution_optimistic: Some(execution_optimistic), - finalized: Some(finalized), + Ok(api_types::ExecutionOptimisticFinalizedResponse { + data, + execution_optimistic: Some(execution_optimistic), + finalized: Some(finalized), + }) }) - }) - }); + }, + ); // GET beacon/states/{state_id}/validator_balances?id let get_beacon_state_validator_balances = beacon_states_path @@ -600,9 +622,10 @@ pub fn serve( .and(multi_key_query::()) .and_then( |state_id: StateId, + task_spawner: TaskSpawner, chain: Arc>, query_res: Result| { - blocking_json_task(move || { + task_spawner.blocking_json_task(Priority::P1, move || { let query = query_res?; let (data, execution_optimistic, finalized) = state_id .map_state_and_execution_optimistic_and_finalized( @@ -657,9 +680,10 @@ pub fn serve( .and(multi_key_query::()) .and_then( |state_id: StateId, + task_spawner: TaskSpawner, chain: Arc>, query_res: Result| { - blocking_json_task(move || { + task_spawner.blocking_json_task(Priority::P1, move || { let query = query_res?; let (data, execution_optimistic, finalized) = state_id .map_state_and_execution_optimistic_and_finalized( @@ -739,8 +763,11 @@ pub fn serve( })) .and(warp::path::end()) .and_then( - |state_id: StateId, chain: Arc>, validator_id: ValidatorId| { - blocking_json_task(move || { + |state_id: StateId, + task_spawner: TaskSpawner, + chain: Arc>, + validator_id: ValidatorId| { + task_spawner.blocking_json_task(Priority::P1, move || { let (data, execution_optimistic, finalized) = state_id .map_state_and_execution_optimistic_and_finalized( &chain, @@ -799,8 +826,11 @@ pub fn serve( .and(warp::query::()) .and(warp::path::end()) .and_then( - |state_id: StateId, chain: Arc>, query: api_types::CommitteesQuery| { - blocking_json_task(move || { + |state_id: StateId, + task_spawner: TaskSpawner, + chain: Arc>, + query: api_types::CommitteesQuery| { + task_spawner.blocking_json_task(Priority::P1, move || { let (data, execution_optimistic, finalized) = state_id .map_state_and_execution_optimistic_and_finalized( &chain, @@ -980,9 +1010,10 @@ pub fn serve( .and(warp::path::end()) .and_then( |state_id: StateId, + task_spawner: TaskSpawner, chain: Arc>, query: api_types::SyncCommitteesQuery| { - blocking_json_task(move || { + task_spawner.blocking_json_task(Priority::P1, move || { let (sync_committee, execution_optimistic, finalized) = state_id .map_state_and_execution_optimistic_and_finalized( &chain, @@ -1044,8 +1075,11 @@ pub fn serve( .and(warp::query::()) .and(warp::path::end()) .and_then( - |state_id: StateId, chain: Arc>, query: api_types::RandaoQuery| { - blocking_json_task(move || { + |state_id: StateId, + task_spawner: TaskSpawner, + chain: Arc>, + query: api_types::RandaoQuery| { + task_spawner.blocking_json_task(Priority::P1, move || { let (randao, execution_optimistic, finalized) = state_id .map_state_and_execution_optimistic_and_finalized( &chain, @@ -1080,10 +1114,13 @@ pub fn serve( .and(warp::path("headers")) .and(warp::query::()) .and(warp::path::end()) + .and(task_spawner_filter.clone()) .and(chain_filter.clone()) .and_then( - |query: api_types::HeadersQuery, chain: Arc>| { - blocking_json_task(move || { + |query: api_types::HeadersQuery, + task_spawner: TaskSpawner, + chain: Arc>| { + task_spawner.blocking_json_task(Priority::P1, move || { let (root, block, execution_optimistic, finalized) = match (query.slot, query.parent_root) { // No query parameters, return the canonical head block. @@ -1177,36 +1214,41 @@ pub fn serve( )) })) .and(warp::path::end()) + .and(task_spawner_filter.clone()) .and(chain_filter.clone()) - .and_then(|block_id: BlockId, chain: Arc>| { - blocking_json_task(move || { - let (root, execution_optimistic, finalized) = block_id.root(&chain)?; - // Ignore the second `execution_optimistic` since the first one has more - // information about the original request. - let (block, _execution_optimistic, _finalized) = - BlockId::from_root(root).blinded_block(&chain)?; + .and_then( + |block_id: BlockId, + task_spawner: TaskSpawner, + chain: Arc>| { + task_spawner.blocking_json_task(Priority::P1, move || { + let (root, execution_optimistic, finalized) = block_id.root(&chain)?; + // Ignore the second `execution_optimistic` since the first one has more + // information about the original request. + let (block, _execution_optimistic, _finalized) = + BlockId::from_root(root).blinded_block(&chain)?; - let canonical = chain - .block_root_at_slot(block.slot(), WhenSlotSkipped::None) - .map_err(warp_utils::reject::beacon_chain_error)? - .map_or(false, |canonical| root == canonical); + let canonical = chain + .block_root_at_slot(block.slot(), WhenSlotSkipped::None) + .map_err(warp_utils::reject::beacon_chain_error)? + .map_or(false, |canonical| root == canonical); - let data = api_types::BlockHeaderData { - root, - canonical, - header: api_types::BlockHeaderAndSignature { - message: block.message().block_header(), - signature: block.signature().clone().into(), - }, - }; + let data = api_types::BlockHeaderData { + root, + canonical, + header: api_types::BlockHeaderAndSignature { + message: block.message().block_header(), + signature: block.signature().clone().into(), + }, + }; - Ok(api_types::ExecutionOptimisticFinalizedResponse { - execution_optimistic: Some(execution_optimistic), - finalized: Some(finalized), - data, + Ok(api_types::ExecutionOptimisticFinalizedResponse { + execution_optimistic: Some(execution_optimistic), + finalized: Some(finalized), + data, + }) }) - }) - }); + }, + ); /* * beacon/blocks @@ -1218,24 +1260,28 @@ pub fn serve( .and(warp::path("blocks")) .and(warp::path::end()) .and(warp::body::json()) + .and(task_spawner_filter.clone()) .and(chain_filter.clone()) .and(network_tx_filter.clone()) .and(log_filter.clone()) .and_then( |block: Arc>, + task_spawner: TaskSpawner, chain: Arc>, network_tx: UnboundedSender>, - log: Logger| async move { - publish_blocks::publish_block( - None, - ProvenancedBlock::local(block), - chain, - &network_tx, - log, - BroadcastValidation::default(), - ) - .await - .map(|()| warp::reply().into_response()) + log: Logger| { + task_spawner.async_task(Priority::P1, async move { + publish_blocks::publish_block( + None, + ProvenancedBlock::local(block), + chain, + &network_tx, + log, + BroadcastValidation::default(), + ) + .await + .map(|()| warp::reply().into_response()) + }) }, ); diff --git a/beacon_node/http_api/src/task_spawner.rs b/beacon_node/http_api/src/task_spawner.rs index d170d31118..0f6717f3a2 100644 --- a/beacon_node/http_api/src/task_spawner.rs +++ b/beacon_node/http_api/src/task_spawner.rs @@ -8,6 +8,20 @@ use warp::reply::{Reply, Response}; #[derive(Clone, Copy)] pub enum Priority { P0, + P1, +} + +impl Priority { + fn work_event(&self, process_fn: BlockingOrAsync) -> WorkEvent { + let work = match self { + Priority::P0 => Work::ApiRequestP0(process_fn), + Priority::P1 => unimplemented!("P1"), + }; + WorkEvent { + drop_during_sync: false, + work, + } + } } pub struct TaskSpawner { @@ -22,7 +36,7 @@ impl TaskSpawner { } pub async fn blocking_json_task( - &self, + self, priority: Priority, func: F, ) -> Result @@ -112,15 +126,7 @@ async fn send_to_beacon_processor( where T: Serialize + Send + 'static, { - let work = match priority { - Priority::P0 => Work::ApiRequestP0(process_fn), - }; - let work_event = WorkEvent { - drop_during_sync: false, - work, - }; - - let error_message = match beacon_processor_send.try_send(work_event) { + let error_message = match beacon_processor_send.try_send(priority.work_event(process_fn)) { Ok(()) => { match rx.await { // The beacon processor executed the task and sent a result.