diff --git a/beacon_node/beacon_processor/src/lib.rs b/beacon_node/beacon_processor/src/lib.rs index 5a68cf5e6b..f87b1a205d 100644 --- a/beacon_node/beacon_processor/src/lib.rs +++ b/beacon_node/beacon_processor/src/lib.rs @@ -171,6 +171,10 @@ const MAX_LIGHT_CLIENT_BOOTSTRAP_QUEUE_LEN: usize = 1_024; /// they begin to be dropped. const MAX_API_REQUEST_P0_QUEUE_LEN: usize = 1_024; +/// The maximum number of priority-1 (second-highest priority) messages that will be queued before +/// they begin to be dropped. +const MAX_API_REQUEST_P1_QUEUE_LEN: usize = 1_024; + /// The name of the manager tokio task. const MANAGER_TASK_NAME: &str = "beacon_processor_manager"; @@ -220,6 +224,7 @@ pub const UNKNOWN_BLOCK_AGGREGATE: &str = "unknown_block_aggregate"; pub const UNKNOWN_LIGHT_CLIENT_UPDATE: &str = "unknown_light_client_update"; pub const GOSSIP_BLS_TO_EXECUTION_CHANGE: &str = "gossip_bls_to_execution_change"; pub const API_REQUEST_P0: &str = "api_request_p0"; +pub const API_REQUEST_P1: &str = "api_request_p1"; /// A simple first-in-first-out queue with a maximum length. struct FifoQueue { @@ -533,6 +538,7 @@ pub enum Work { GossipBlsToExecutionChange(BlockingFn), LightClientBootstrapRequest(BlockingFn), ApiRequestP0(BlockingOrAsync), + ApiRequestP1(BlockingOrAsync), } impl fmt::Debug for Work { @@ -571,6 +577,7 @@ impl Work { Work::GossipBlsToExecutionChange(_) => GOSSIP_BLS_TO_EXECUTION_CHANGE, Work::UnknownLightClientOptimisticUpdate { .. } => UNKNOWN_LIGHT_CLIENT_UPDATE, Work::ApiRequestP0 { .. } => API_REQUEST_P0, + Work::ApiRequestP1 { .. } => API_REQUEST_P1, } } } @@ -725,6 +732,7 @@ impl BeaconProcessor { let mut lcbootstrap_queue = FifoQueue::new(MAX_LIGHT_CLIENT_BOOTSTRAP_QUEUE_LEN); let mut api_request_p0_queue = FifoQueue::new(MAX_API_REQUEST_P0_QUEUE_LEN); + let mut api_request_p1_queue = FifoQueue::new(MAX_API_REQUEST_P1_QUEUE_LEN); // Channels for sending work to the re-process scheduler (`work_reprocessing_tx`) and to // receive them back once they are ready (`ready_work_rx`). @@ -1018,6 +1026,12 @@ impl BeaconProcessor { self.spawn_worker(item, idle_tx); } else if let Some(item) = gossip_bls_to_execution_change_queue.pop() { self.spawn_worker(item, idle_tx); + // Check the priority 1 API requests after we've + // processed all the interesting things from the network + // and things required for us to stay in good repute + // with our P2P peers. + } else if let Some(item) = api_request_p0_queue.pop() { + self.spawn_worker(item, idle_tx); // Handle backfill sync chain segments. } else if let Some(item) = backfill_chain_segment.pop() { self.spawn_worker(item, idle_tx); @@ -1143,6 +1157,9 @@ impl BeaconProcessor { Work::ApiRequestP0 { .. } => { api_request_p0_queue.push(work, work_id, &self.log) } + Work::ApiRequestP1 { .. } => { + api_request_p1_queue.push(work, work_id, &self.log) + } } } } @@ -1314,7 +1331,7 @@ impl BeaconProcessor { task_spawner.spawn_blocking_with_manual_send_idle(work) } Work::ChainSegmentBackfill(process_fn) => task_spawner.spawn_async(process_fn), - Work::ApiRequestP0(process_fn) => match process_fn { + Work::ApiRequestP0(process_fn) | Work::ApiRequestP1(process_fn) => match process_fn { BlockingOrAsync::Blocking(process_fn) => task_spawner.spawn_blocking(process_fn), BlockingOrAsync::Async(process_fn) => task_spawner.spawn_async(process_fn), }, diff --git a/beacon_node/http_api/src/task_spawner.rs b/beacon_node/http_api/src/task_spawner.rs index 0a7a79cf0a..8915f91298 100644 --- a/beacon_node/http_api/src/task_spawner.rs +++ b/beacon_node/http_api/src/task_spawner.rs @@ -15,7 +15,7 @@ impl Priority { fn work_event(&self, process_fn: BlockingOrAsync) -> WorkEvent { let work = match self { Priority::P0 => Work::ApiRequestP0(process_fn), - Priority::P1 => unimplemented!("P1"), + Priority::P1 => Work::ApiRequestP1(process_fn), }; WorkEvent { drop_during_sync: false,