From 238837bc91cfd2bf851a327807661841a9d699ee Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Mon, 3 Jul 2023 15:42:25 +1000 Subject: [PATCH] Start adding HTTP queues --- beacon_node/beacon_processor/src/lib.rs | 25 ++++++++++++++++++++++++- 1 file changed, 24 insertions(+), 1 deletion(-) diff --git a/beacon_node/beacon_processor/src/lib.rs b/beacon_node/beacon_processor/src/lib.rs index 88066f2a30..5a68cf5e6b 100644 --- a/beacon_node/beacon_processor/src/lib.rs +++ b/beacon_node/beacon_processor/src/lib.rs @@ -167,6 +167,10 @@ const MAX_BLS_TO_EXECUTION_CHANGE_QUEUE_LEN: usize = 16_384; /// will be stored before we start dropping them. const MAX_LIGHT_CLIENT_BOOTSTRAP_QUEUE_LEN: usize = 1_024; +/// The maximum number of priority-0 (highest priority) messages that will be queued before +/// they begin to be dropped. +const MAX_API_REQUEST_P0_QUEUE_LEN: usize = 1_024; + /// The name of the manager tokio task. const MANAGER_TASK_NAME: &str = "beacon_processor_manager"; @@ -215,6 +219,7 @@ pub const UNKNOWN_BLOCK_ATTESTATION: &str = "unknown_block_attestation"; pub const UNKNOWN_BLOCK_AGGREGATE: &str = "unknown_block_aggregate"; pub const UNKNOWN_LIGHT_CLIENT_UPDATE: &str = "unknown_light_client_update"; pub const GOSSIP_BLS_TO_EXECUTION_CHANGE: &str = "gossip_bls_to_execution_change"; +pub const API_REQUEST_P0: &str = "api_request_p0"; /// A simple first-in-first-out queue with a maximum length. struct FifoQueue { @@ -363,7 +368,7 @@ impl WorkEvent { } } -impl std::convert::From for WorkEvent { +impl From for WorkEvent { fn from(ready_work: ReadyWork) -> Self { match ready_work { ReadyWork::Block(QueuedGossipBlock { @@ -465,6 +470,10 @@ impl BeaconProcessorSend { pub type AsyncFn = Pin + Send + Sync>>; pub type BlockingFn = Box; pub type BlockingFnWithManualSendOnIdle = Box; +pub enum BlockingOrAsync { + Blocking(BlockingFn), + Async(AsyncFn), +} /// Indicates the type of work to be performed and therefore its priority and /// queuing specifics. @@ -523,6 +532,7 @@ pub enum Work { BlocksByRootsRequest(BlockingFnWithManualSendOnIdle), GossipBlsToExecutionChange(BlockingFn), LightClientBootstrapRequest(BlockingFn), + ApiRequestP0(BlockingOrAsync), } impl fmt::Debug for Work { @@ -560,6 +570,7 @@ impl Work { Work::UnknownBlockAggregate { .. } => UNKNOWN_BLOCK_AGGREGATE, Work::GossipBlsToExecutionChange(_) => GOSSIP_BLS_TO_EXECUTION_CHANGE, Work::UnknownLightClientOptimisticUpdate { .. } => UNKNOWN_LIGHT_CLIENT_UPDATE, + Work::ApiRequestP0 { .. } => API_REQUEST_P0, } } } @@ -713,6 +724,8 @@ impl BeaconProcessor { let mut lcbootstrap_queue = FifoQueue::new(MAX_LIGHT_CLIENT_BOOTSTRAP_QUEUE_LEN); + let mut api_request_p0_queue = FifoQueue::new(MAX_API_REQUEST_P0_QUEUE_LEN); + // Channels for sending work to the re-process scheduler (`work_reprocessing_tx`) and to // receive them back once they are ready (`ready_work_rx`). let (ready_work_tx, ready_work_rx) = @@ -847,6 +860,9 @@ impl BeaconProcessor { // required to verify some attestations. } else if let Some(item) = gossip_block_queue.pop() { self.spawn_worker(item, idle_tx); + // Check the priority 0 API requests after blocks, but before attestations. + } else if let Some(item) = api_request_p0_queue.pop() { + self.spawn_worker(item, idle_tx); // Check the aggregates, *then* the unaggregates since we assume that // aggregates are more valuable to local validators and effectively give us // more information with less signature verification time. @@ -1124,6 +1140,9 @@ impl BeaconProcessor { Work::UnknownLightClientOptimisticUpdate { .. } => { unknown_light_client_update_queue.push(work, work_id, &self.log) } + Work::ApiRequestP0 { .. } => { + api_request_p0_queue.push(work, work_id, &self.log) + } } } } @@ -1295,6 +1314,10 @@ impl BeaconProcessor { task_spawner.spawn_blocking_with_manual_send_idle(work) } Work::ChainSegmentBackfill(process_fn) => task_spawner.spawn_async(process_fn), + Work::ApiRequestP0(process_fn) => match process_fn { + BlockingOrAsync::Blocking(process_fn) => task_spawner.spawn_blocking(process_fn), + BlockingOrAsync::Async(process_fn) => task_spawner.spawn_async(process_fn), + }, Work::GossipVoluntaryExit(process_fn) | Work::GossipProposerSlashing(process_fn) | Work::GossipAttesterSlashing(process_fn)