From af274029e8c61fe01048105ba1f192cc762effeb Mon Sep 17 00:00:00 2001 From: Eitan Seri-Levi Date: Tue, 23 Sep 2025 23:37:34 -0700 Subject: [PATCH] Run reconstruction inside a scoped rayon pool (#8075) Co-Authored-By: Jimmy Chen Co-Authored-By: Eitan Seri- Levi Co-Authored-By: Eitan Seri-Levi --- Cargo.lock | 3 +- beacon_node/beacon_chain/src/beacon_chain.rs | 16 ++--- beacon_node/beacon_processor/Cargo.toml | 1 - beacon_node/beacon_processor/src/lib.rs | 20 ++----- .../beacon_processor/src/rayon_manager.rs | 27 --------- beacon_node/client/src/builder.rs | 2 - beacon_node/http_api/src/test_utils.rs | 2 - .../src/network_beacon_processor/tests.rs | 2 - common/task_executor/Cargo.toml | 2 + common/task_executor/src/lib.rs | 50 +++++++++++++++- .../task_executor/src/rayon_pool_provider.rs | 58 +++++++++++++++++++ 11 files changed, 123 insertions(+), 60 deletions(-) delete mode 100644 beacon_node/beacon_processor/src/rayon_manager.rs create mode 100644 common/task_executor/src/rayon_pool_provider.rs diff --git a/Cargo.lock b/Cargo.lock index c100fa5ae2..ee65108097 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -980,7 +980,6 @@ dependencies = [ "metrics", "num_cpus", "parking_lot 0.12.3", - "rayon", "serde", "slot_clock", "strum", @@ -9232,6 +9231,8 @@ dependencies = [ "async-channel 1.9.0", "futures", "metrics", + "num_cpus", + "rayon", "tokio", "tracing", ] diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index ef3c2f52e0..4f0c6aada0 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -124,7 +124,7 @@ use store::{ BlobSidecarListFromRoot, DBColumn, DatabaseBlock, Error as DBError, HotColdDB, HotStateSummary, KeyValueStore, KeyValueStoreOp, StoreItem, StoreOp, }; -use task_executor::{ShutdownReason, TaskExecutor}; +use task_executor::{RayonPoolType, ShutdownReason, TaskExecutor}; use tokio_stream::Stream; use tracing::{Span, debug, debug_span, error, info, info_span, instrument, trace, warn}; use tree_hash::TreeHash; @@ -3274,16 +3274,12 @@ impl BeaconChain { let current_span = Span::current(); let result = self .task_executor - .spawn_blocking_handle( - move || { - let _guard = current_span.enter(); - data_availability_checker.reconstruct_data_columns(&block_root) - }, - "reconstruct_data_columns", - ) - .ok_or(BeaconChainError::RuntimeShutdown)? + .spawn_blocking_with_rayon_async(RayonPoolType::HighPriority, move || { + let _guard = current_span.enter(); + data_availability_checker.reconstruct_data_columns(&block_root) + }) .await - .map_err(BeaconChainError::TokioJoin)??; + .map_err(|_| BeaconChainError::RuntimeShutdown)??; match result { DataColumnReconstructionResult::Success((availability, data_columns_to_publish)) => { diff --git a/beacon_node/beacon_processor/Cargo.toml b/beacon_node/beacon_processor/Cargo.toml index 262badf7f9..afd4660c9a 100644 --- a/beacon_node/beacon_processor/Cargo.toml +++ b/beacon_node/beacon_processor/Cargo.toml @@ -12,7 +12,6 @@ logging = { workspace = true } metrics = { workspace = true } num_cpus = { workspace = true } parking_lot = { workspace = true } -rayon = { workspace = true } serde = { workspace = true } slot_clock = { workspace = true } strum = { workspace = true } diff --git a/beacon_node/beacon_processor/src/lib.rs b/beacon_node/beacon_processor/src/lib.rs index 64aeb4ceaf..28ed0cca91 100644 --- a/beacon_node/beacon_processor/src/lib.rs +++ b/beacon_node/beacon_processor/src/lib.rs @@ -38,7 +38,6 @@ //! checks the queues to see if there are more parcels of work that can be spawned in a new worker //! task. -use crate::rayon_manager::RayonManager; use crate::work_reprocessing_queue::{ QueuedBackfillBatch, QueuedColumnReconstruction, QueuedGossipBlock, ReprocessQueueMessage, }; @@ -48,7 +47,6 @@ use lighthouse_network::{MessageId, NetworkGlobals, PeerId}; use logging::TimeLatch; use logging::crit; use parking_lot::Mutex; -use rayon::ThreadPool; pub use scheduler::work_reprocessing_queue; use serde::{Deserialize, Serialize}; use slot_clock::SlotClock; @@ -61,7 +59,7 @@ use std::sync::Arc; use std::task::Context; use std::time::{Duration, Instant}; use strum::IntoStaticStr; -use task_executor::TaskExecutor; +use task_executor::{RayonPoolType, TaskExecutor}; use tokio::sync::mpsc; use tokio::sync::mpsc::error::TrySendError; use tracing::{debug, error, trace, warn}; @@ -76,7 +74,6 @@ use work_reprocessing_queue::{ }; mod metrics; -pub mod rayon_manager; pub mod scheduler; /// The maximum size of the channel for work events to the `BeaconProcessor`. @@ -810,7 +807,6 @@ pub struct BeaconProcessor { pub network_globals: Arc>, pub executor: TaskExecutor, pub current_workers: usize, - pub rayon_manager: RayonManager, pub config: BeaconProcessorConfig, } @@ -1609,10 +1605,7 @@ impl BeaconProcessor { } Work::ChainSegmentBackfill(process_fn) => { if self.config.enable_backfill_rate_limiting { - task_spawner.spawn_blocking_with_rayon( - self.rayon_manager.low_priority_threadpool.clone(), - process_fn, - ) + task_spawner.spawn_blocking_with_rayon(RayonPoolType::LowPriority, process_fn) } else { // use the global rayon thread pool if backfill rate limiting is disabled. task_spawner.spawn_blocking(process_fn) @@ -1681,17 +1674,16 @@ impl TaskSpawner { } /// Spawns a blocking task on a rayon thread pool, dropping the `SendOnDrop` after task completion. - fn spawn_blocking_with_rayon(self, thread_pool: Arc, task: F) + fn spawn_blocking_with_rayon(self, rayon_pool_type: RayonPoolType, task: F) where F: FnOnce() + Send + 'static, { - self.executor.spawn_blocking( + self.executor.spawn_blocking_with_rayon( move || { - thread_pool.install(|| { - task(); - }); + task(); drop(self.send_idle_on_drop) }, + rayon_pool_type, WORKER_TASK_NAME, ) } diff --git a/beacon_node/beacon_processor/src/rayon_manager.rs b/beacon_node/beacon_processor/src/rayon_manager.rs deleted file mode 100644 index 99fe32d5cc..0000000000 --- a/beacon_node/beacon_processor/src/rayon_manager.rs +++ /dev/null @@ -1,27 +0,0 @@ -use rayon::{ThreadPool, ThreadPoolBuilder}; -use std::sync::Arc; - -const DEFAULT_LOW_PRIORITY_DIVISOR: usize = 4; -const MINIMUM_LOW_PRIORITY_THREAD_COUNT: usize = 1; - -pub struct RayonManager { - /// Smaller rayon thread pool for lower-priority, compute-intensive tasks. - /// By default ~25% of CPUs or a minimum of 1 thread. - pub low_priority_threadpool: Arc, -} - -impl Default for RayonManager { - fn default() -> Self { - let low_prio_threads = - (num_cpus::get() / DEFAULT_LOW_PRIORITY_DIVISOR).max(MINIMUM_LOW_PRIORITY_THREAD_COUNT); - let low_priority_threadpool = Arc::new( - ThreadPoolBuilder::new() - .num_threads(low_prio_threads) - .build() - .expect("failed to build low-priority rayon pool"), - ); - Self { - low_priority_threadpool, - } - } -} diff --git a/beacon_node/client/src/builder.rs b/beacon_node/client/src/builder.rs index 87cdcc45ef..d984d5fedc 100644 --- a/beacon_node/client/src/builder.rs +++ b/beacon_node/client/src/builder.rs @@ -17,7 +17,6 @@ use beacon_chain::{ store::{HotColdDB, ItemStore, StoreConfig}, }; use beacon_chain::{Kzg, LightClientProducerEvent}; -use beacon_processor::rayon_manager::RayonManager; use beacon_processor::{BeaconProcessor, BeaconProcessorChannels}; use beacon_processor::{BeaconProcessorConfig, BeaconProcessorQueueLengths}; use environment::RuntimeContext; @@ -681,7 +680,6 @@ where executor: beacon_processor_context.executor.clone(), current_workers: 0, config: beacon_processor_config, - rayon_manager: RayonManager::default(), } .spawn_manager( beacon_processor_channels.beacon_processor_rx, diff --git a/beacon_node/http_api/src/test_utils.rs b/beacon_node/http_api/src/test_utils.rs index 7be8960e69..fe9e0dff70 100644 --- a/beacon_node/http_api/src/test_utils.rs +++ b/beacon_node/http_api/src/test_utils.rs @@ -5,7 +5,6 @@ use beacon_chain::{ }; use beacon_processor::{ BeaconProcessor, BeaconProcessorChannels, BeaconProcessorConfig, BeaconProcessorQueueLengths, - rayon_manager::RayonManager, }; use directory::DEFAULT_ROOT_DIR; use eth2::{BeaconNodeHttpClient, Timeouts}; @@ -248,7 +247,6 @@ pub async fn create_api_server_with_config( executor: test_runtime.task_executor.clone(), current_workers: 0, config: beacon_processor_config, - rayon_manager: RayonManager::default(), } .spawn_manager( beacon_processor_rx, diff --git a/beacon_node/network/src/network_beacon_processor/tests.rs b/beacon_node/network/src/network_beacon_processor/tests.rs index 99410bc5e5..4137c974bf 100644 --- a/beacon_node/network/src/network_beacon_processor/tests.rs +++ b/beacon_node/network/src/network_beacon_processor/tests.rs @@ -17,7 +17,6 @@ use beacon_chain::test_utils::{ test_spec, }; use beacon_chain::{BeaconChain, WhenSlotSkipped}; -use beacon_processor::rayon_manager::RayonManager; use beacon_processor::{work_reprocessing_queue::*, *}; use gossipsub::MessageAcceptance; use itertools::Itertools; @@ -267,7 +266,6 @@ impl TestRig { executor, current_workers: 0, config: beacon_processor_config, - rayon_manager: RayonManager::default(), } .spawn_manager( beacon_processor_rx, diff --git a/common/task_executor/Cargo.toml b/common/task_executor/Cargo.toml index d4faf1e4b8..92a4fc4b59 100644 --- a/common/task_executor/Cargo.toml +++ b/common/task_executor/Cargo.toml @@ -8,6 +8,8 @@ edition = { workspace = true } async-channel = { workspace = true } futures = { workspace = true } metrics = { workspace = true } +num_cpus = { workspace = true } +rayon = { workspace = true } tokio = { workspace = true, features = ["rt-multi-thread", "macros"] } tracing = { workspace = true } diff --git a/common/task_executor/src/lib.rs b/common/task_executor/src/lib.rs index 5f0c822b03..0b8e9f8eba 100644 --- a/common/task_executor/src/lib.rs +++ b/common/task_executor/src/lib.rs @@ -1,12 +1,15 @@ mod metrics; +mod rayon_pool_provider; pub mod test_utils; use futures::channel::mpsc::Sender; use futures::prelude::*; -use std::sync::Weak; +use std::sync::{Arc, Weak}; use tokio::runtime::{Handle, Runtime}; use tracing::debug; +use crate::rayon_pool_provider::RayonPoolProvider; +pub use crate::rayon_pool_provider::RayonPoolType; pub use tokio::task::JoinHandle; /// Provides a reason when Lighthouse is shut down. @@ -84,6 +87,8 @@ pub struct TaskExecutor { // FIXME(sproul): delete? #[allow(dead_code)] service_name: String, + + rayon_pool_provider: Arc, } impl TaskExecutor { @@ -105,6 +110,7 @@ impl TaskExecutor { exit, signal_tx, service_name, + rayon_pool_provider: Arc::new(RayonPoolProvider::default()), } } @@ -115,6 +121,7 @@ impl TaskExecutor { exit: self.exit.clone(), signal_tx: self.signal_tx.clone(), service_name, + rayon_pool_provider: self.rayon_pool_provider.clone(), } } @@ -226,6 +233,47 @@ impl TaskExecutor { } } + /// Spawns a blocking task on a dedicated tokio thread pool and installs a rayon context within it. + pub fn spawn_blocking_with_rayon( + self, + task: F, + rayon_pool_type: RayonPoolType, + name: &'static str, + ) where + F: FnOnce() + Send + 'static, + { + let thread_pool = self.rayon_pool_provider.get_thread_pool(rayon_pool_type); + self.spawn_blocking( + move || { + thread_pool.install(|| { + task(); + }); + }, + name, + ) + } + + /// Spawns a blocking computation on a rayon thread pool and awaits the result. + pub async fn spawn_blocking_with_rayon_async( + &self, + rayon_pool_type: RayonPoolType, + task: F, + ) -> Result + where + F: FnOnce() -> R + Send + 'static, + R: Send + 'static, + { + let thread_pool = self.rayon_pool_provider.get_thread_pool(rayon_pool_type); + let (tx, rx) = tokio::sync::oneshot::channel(); + + thread_pool.spawn(move || { + let result = task(); + let _ = tx.send(result); + }); + + rx.await + } + /// Spawn a future on the tokio runtime wrapped in an `async-channel::Receiver` returning an optional /// join handle to the future. /// The task is cancelled when the corresponding async-channel is dropped. diff --git a/common/task_executor/src/rayon_pool_provider.rs b/common/task_executor/src/rayon_pool_provider.rs new file mode 100644 index 0000000000..8e12f7eaa4 --- /dev/null +++ b/common/task_executor/src/rayon_pool_provider.rs @@ -0,0 +1,58 @@ +use rayon::{ThreadPool, ThreadPoolBuilder}; +use std::sync::Arc; + +const DEFAULT_LOW_PRIORITY_CPU_PERCENTAGE: usize = 25; +const DEFAULT_HIGH_PRIORITY_CPU_PERCENTAGE: usize = 80; +const MINIMUM_THREAD_COUNT: usize = 1; + +pub enum RayonPoolType { + HighPriority, + LowPriority, +} + +pub struct RayonPoolProvider { + /// Smaller rayon thread pool for lower-priority, compute-intensive tasks. + /// By default ~25% of CPUs or a minimum of 1 thread. + low_priority_thread_pool: Arc, + /// Larger rayon thread pool for high-priority, compute-intensive tasks. + /// By default ~80% of CPUs or a minimum of 1 thread. Citical/highest + /// priority tasks should use the global pool instead. + high_priority_thread_pool: Arc, +} + +impl Default for RayonPoolProvider { + fn default() -> Self { + let low_prio_threads = + (num_cpus::get() * DEFAULT_LOW_PRIORITY_CPU_PERCENTAGE / 100).max(MINIMUM_THREAD_COUNT); + let low_priority_thread_pool = Arc::new( + ThreadPoolBuilder::new() + .num_threads(low_prio_threads) + .build() + .expect("failed to build low-priority rayon pool"), + ); + + let high_prio_threads = (num_cpus::get() * DEFAULT_HIGH_PRIORITY_CPU_PERCENTAGE / 100) + .max(MINIMUM_THREAD_COUNT); + let high_priority_thread_pool = Arc::new( + ThreadPoolBuilder::new() + .num_threads(high_prio_threads) + .build() + .expect("failed to build high-priority rayon pool"), + ); + Self { + low_priority_thread_pool, + high_priority_thread_pool, + } + } +} + +impl RayonPoolProvider { + /// Get a scoped thread pool by priority level. + /// For critical/highest priority tasks, use the global pool instead. + pub fn get_thread_pool(&self, rayon_pool_type: RayonPoolType) -> Arc { + match rayon_pool_type { + RayonPoolType::HighPriority => self.high_priority_thread_pool.clone(), + RayonPoolType::LowPriority => self.low_priority_thread_pool.clone(), + } + } +}