Start adding HTTP queues

This commit is contained in:
Paul Hauner
2023-07-03 15:42:25 +10:00
parent c25825a539
commit 238837bc91

View File

@@ -167,6 +167,10 @@ const MAX_BLS_TO_EXECUTION_CHANGE_QUEUE_LEN: usize = 16_384;
/// will be stored before we start dropping them.
const MAX_LIGHT_CLIENT_BOOTSTRAP_QUEUE_LEN: usize = 1_024;
/// The maximum number of priority-0 (highest priority) messages that will be queued before
/// they begin to be dropped.
const MAX_API_REQUEST_P0_QUEUE_LEN: usize = 1_024;
/// The name of the manager tokio task.
const MANAGER_TASK_NAME: &str = "beacon_processor_manager";
@@ -215,6 +219,7 @@ pub const UNKNOWN_BLOCK_ATTESTATION: &str = "unknown_block_attestation";
pub const UNKNOWN_BLOCK_AGGREGATE: &str = "unknown_block_aggregate";
pub const UNKNOWN_LIGHT_CLIENT_UPDATE: &str = "unknown_light_client_update";
pub const GOSSIP_BLS_TO_EXECUTION_CHANGE: &str = "gossip_bls_to_execution_change";
pub const API_REQUEST_P0: &str = "api_request_p0";
/// A simple first-in-first-out queue with a maximum length.
struct FifoQueue<T> {
@@ -363,7 +368,7 @@ impl<E: EthSpec> WorkEvent<E> {
}
}
impl<E: EthSpec> std::convert::From<ReadyWork> for WorkEvent<E> {
impl<E: EthSpec> From<ReadyWork> for WorkEvent<E> {
fn from(ready_work: ReadyWork) -> Self {
match ready_work {
ReadyWork::Block(QueuedGossipBlock {
@@ -465,6 +470,10 @@ impl<E: EthSpec> BeaconProcessorSend<E> {
pub type AsyncFn = Pin<Box<dyn Future<Output = ()> + Send + Sync>>;
pub type BlockingFn = Box<dyn FnOnce() + Send + Sync>;
pub type BlockingFnWithManualSendOnIdle = Box<dyn FnOnce(SendOnDrop) + Send + Sync>;
pub enum BlockingOrAsync {
Blocking(BlockingFn),
Async(AsyncFn),
}
/// Indicates the type of work to be performed and therefore its priority and
/// queuing specifics.
@@ -523,6 +532,7 @@ pub enum Work<E: EthSpec> {
BlocksByRootsRequest(BlockingFnWithManualSendOnIdle),
GossipBlsToExecutionChange(BlockingFn),
LightClientBootstrapRequest(BlockingFn),
ApiRequestP0(BlockingOrAsync),
}
impl<E: EthSpec> fmt::Debug for Work<E> {
@@ -560,6 +570,7 @@ impl<E: EthSpec> Work<E> {
Work::UnknownBlockAggregate { .. } => UNKNOWN_BLOCK_AGGREGATE,
Work::GossipBlsToExecutionChange(_) => GOSSIP_BLS_TO_EXECUTION_CHANGE,
Work::UnknownLightClientOptimisticUpdate { .. } => UNKNOWN_LIGHT_CLIENT_UPDATE,
Work::ApiRequestP0 { .. } => API_REQUEST_P0,
}
}
}
@@ -713,6 +724,8 @@ impl<E: EthSpec> BeaconProcessor<E> {
let mut lcbootstrap_queue = FifoQueue::new(MAX_LIGHT_CLIENT_BOOTSTRAP_QUEUE_LEN);
let mut api_request_p0_queue = FifoQueue::new(MAX_API_REQUEST_P0_QUEUE_LEN);
// Channels for sending work to the re-process scheduler (`work_reprocessing_tx`) and to
// receive them back once they are ready (`ready_work_rx`).
let (ready_work_tx, ready_work_rx) =
@@ -847,6 +860,9 @@ impl<E: EthSpec> BeaconProcessor<E> {
// required to verify some attestations.
} else if let Some(item) = gossip_block_queue.pop() {
self.spawn_worker(item, idle_tx);
// Check the priority 0 API requests after blocks, but before attestations.
} else if let Some(item) = api_request_p0_queue.pop() {
self.spawn_worker(item, idle_tx);
// Check the aggregates, *then* the unaggregates since we assume that
// aggregates are more valuable to local validators and effectively give us
// more information with less signature verification time.
@@ -1124,6 +1140,9 @@ impl<E: EthSpec> BeaconProcessor<E> {
Work::UnknownLightClientOptimisticUpdate { .. } => {
unknown_light_client_update_queue.push(work, work_id, &self.log)
}
Work::ApiRequestP0 { .. } => {
api_request_p0_queue.push(work, work_id, &self.log)
}
}
}
}
@@ -1295,6 +1314,10 @@ impl<E: EthSpec> BeaconProcessor<E> {
task_spawner.spawn_blocking_with_manual_send_idle(work)
}
Work::ChainSegmentBackfill(process_fn) => task_spawner.spawn_async(process_fn),
Work::ApiRequestP0(process_fn) => match process_fn {
BlockingOrAsync::Blocking(process_fn) => task_spawner.spawn_blocking(process_fn),
BlockingOrAsync::Async(process_fn) => task_spawner.spawn_async(process_fn),
},
Work::GossipVoluntaryExit(process_fn)
| Work::GossipProposerSlashing(process_fn)
| Work::GossipAttesterSlashing(process_fn)