Run reconstruction inside a scoped rayon pool (#8075)

Co-Authored-By: Jimmy Chen <jchen.tc@gmail.com>

Co-Authored-By: Eitan Seri- Levi <eserilev@gmail.com>

Co-Authored-By: Eitan Seri-Levi <eserilev@ucsc.edu>
This commit is contained in:
Eitan Seri-Levi
2025-09-23 23:37:34 -07:00
committed by GitHub
parent d80c0ff5b5
commit af274029e8
11 changed files with 123 additions and 60 deletions

3
Cargo.lock generated
View File

@@ -980,7 +980,6 @@ dependencies = [
"metrics", "metrics",
"num_cpus", "num_cpus",
"parking_lot 0.12.3", "parking_lot 0.12.3",
"rayon",
"serde", "serde",
"slot_clock", "slot_clock",
"strum", "strum",
@@ -9232,6 +9231,8 @@ dependencies = [
"async-channel 1.9.0", "async-channel 1.9.0",
"futures", "futures",
"metrics", "metrics",
"num_cpus",
"rayon",
"tokio", "tokio",
"tracing", "tracing",
] ]

View File

@@ -124,7 +124,7 @@ use store::{
BlobSidecarListFromRoot, DBColumn, DatabaseBlock, Error as DBError, HotColdDB, HotStateSummary, BlobSidecarListFromRoot, DBColumn, DatabaseBlock, Error as DBError, HotColdDB, HotStateSummary,
KeyValueStore, KeyValueStoreOp, StoreItem, StoreOp, KeyValueStore, KeyValueStoreOp, StoreItem, StoreOp,
}; };
use task_executor::{ShutdownReason, TaskExecutor}; use task_executor::{RayonPoolType, ShutdownReason, TaskExecutor};
use tokio_stream::Stream; use tokio_stream::Stream;
use tracing::{Span, debug, debug_span, error, info, info_span, instrument, trace, warn}; use tracing::{Span, debug, debug_span, error, info, info_span, instrument, trace, warn};
use tree_hash::TreeHash; use tree_hash::TreeHash;
@@ -3274,16 +3274,12 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
let current_span = Span::current(); let current_span = Span::current();
let result = self let result = self
.task_executor .task_executor
.spawn_blocking_handle( .spawn_blocking_with_rayon_async(RayonPoolType::HighPriority, move || {
move || { let _guard = current_span.enter();
let _guard = current_span.enter(); data_availability_checker.reconstruct_data_columns(&block_root)
data_availability_checker.reconstruct_data_columns(&block_root) })
},
"reconstruct_data_columns",
)
.ok_or(BeaconChainError::RuntimeShutdown)?
.await .await
.map_err(BeaconChainError::TokioJoin)??; .map_err(|_| BeaconChainError::RuntimeShutdown)??;
match result { match result {
DataColumnReconstructionResult::Success((availability, data_columns_to_publish)) => { DataColumnReconstructionResult::Success((availability, data_columns_to_publish)) => {

View File

@@ -12,7 +12,6 @@ logging = { workspace = true }
metrics = { workspace = true } metrics = { workspace = true }
num_cpus = { workspace = true } num_cpus = { workspace = true }
parking_lot = { workspace = true } parking_lot = { workspace = true }
rayon = { workspace = true }
serde = { workspace = true } serde = { workspace = true }
slot_clock = { workspace = true } slot_clock = { workspace = true }
strum = { workspace = true } strum = { workspace = true }

View File

@@ -38,7 +38,6 @@
//! checks the queues to see if there are more parcels of work that can be spawned in a new worker //! checks the queues to see if there are more parcels of work that can be spawned in a new worker
//! task. //! task.
use crate::rayon_manager::RayonManager;
use crate::work_reprocessing_queue::{ use crate::work_reprocessing_queue::{
QueuedBackfillBatch, QueuedColumnReconstruction, QueuedGossipBlock, ReprocessQueueMessage, QueuedBackfillBatch, QueuedColumnReconstruction, QueuedGossipBlock, ReprocessQueueMessage,
}; };
@@ -48,7 +47,6 @@ use lighthouse_network::{MessageId, NetworkGlobals, PeerId};
use logging::TimeLatch; use logging::TimeLatch;
use logging::crit; use logging::crit;
use parking_lot::Mutex; use parking_lot::Mutex;
use rayon::ThreadPool;
pub use scheduler::work_reprocessing_queue; pub use scheduler::work_reprocessing_queue;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use slot_clock::SlotClock; use slot_clock::SlotClock;
@@ -61,7 +59,7 @@ use std::sync::Arc;
use std::task::Context; use std::task::Context;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use strum::IntoStaticStr; use strum::IntoStaticStr;
use task_executor::TaskExecutor; use task_executor::{RayonPoolType, TaskExecutor};
use tokio::sync::mpsc; use tokio::sync::mpsc;
use tokio::sync::mpsc::error::TrySendError; use tokio::sync::mpsc::error::TrySendError;
use tracing::{debug, error, trace, warn}; use tracing::{debug, error, trace, warn};
@@ -76,7 +74,6 @@ use work_reprocessing_queue::{
}; };
mod metrics; mod metrics;
pub mod rayon_manager;
pub mod scheduler; pub mod scheduler;
/// The maximum size of the channel for work events to the `BeaconProcessor`. /// The maximum size of the channel for work events to the `BeaconProcessor`.
@@ -810,7 +807,6 @@ pub struct BeaconProcessor<E: EthSpec> {
pub network_globals: Arc<NetworkGlobals<E>>, pub network_globals: Arc<NetworkGlobals<E>>,
pub executor: TaskExecutor, pub executor: TaskExecutor,
pub current_workers: usize, pub current_workers: usize,
pub rayon_manager: RayonManager,
pub config: BeaconProcessorConfig, pub config: BeaconProcessorConfig,
} }
@@ -1609,10 +1605,7 @@ impl<E: EthSpec> BeaconProcessor<E> {
} }
Work::ChainSegmentBackfill(process_fn) => { Work::ChainSegmentBackfill(process_fn) => {
if self.config.enable_backfill_rate_limiting { if self.config.enable_backfill_rate_limiting {
task_spawner.spawn_blocking_with_rayon( task_spawner.spawn_blocking_with_rayon(RayonPoolType::LowPriority, process_fn)
self.rayon_manager.low_priority_threadpool.clone(),
process_fn,
)
} else { } else {
// use the global rayon thread pool if backfill rate limiting is disabled. // use the global rayon thread pool if backfill rate limiting is disabled.
task_spawner.spawn_blocking(process_fn) task_spawner.spawn_blocking(process_fn)
@@ -1681,17 +1674,16 @@ impl TaskSpawner {
} }
/// Spawns a blocking task on a rayon thread pool, dropping the `SendOnDrop` after task completion. /// Spawns a blocking task on a rayon thread pool, dropping the `SendOnDrop` after task completion.
fn spawn_blocking_with_rayon<F>(self, thread_pool: Arc<ThreadPool>, task: F) fn spawn_blocking_with_rayon<F>(self, rayon_pool_type: RayonPoolType, task: F)
where where
F: FnOnce() + Send + 'static, F: FnOnce() + Send + 'static,
{ {
self.executor.spawn_blocking( self.executor.spawn_blocking_with_rayon(
move || { move || {
thread_pool.install(|| { task();
task();
});
drop(self.send_idle_on_drop) drop(self.send_idle_on_drop)
}, },
rayon_pool_type,
WORKER_TASK_NAME, WORKER_TASK_NAME,
) )
} }

View File

@@ -1,27 +0,0 @@
use rayon::{ThreadPool, ThreadPoolBuilder};
use std::sync::Arc;
const DEFAULT_LOW_PRIORITY_DIVISOR: usize = 4;
const MINIMUM_LOW_PRIORITY_THREAD_COUNT: usize = 1;
pub struct RayonManager {
/// Smaller rayon thread pool for lower-priority, compute-intensive tasks.
/// By default ~25% of CPUs or a minimum of 1 thread.
pub low_priority_threadpool: Arc<ThreadPool>,
}
impl Default for RayonManager {
fn default() -> Self {
let low_prio_threads =
(num_cpus::get() / DEFAULT_LOW_PRIORITY_DIVISOR).max(MINIMUM_LOW_PRIORITY_THREAD_COUNT);
let low_priority_threadpool = Arc::new(
ThreadPoolBuilder::new()
.num_threads(low_prio_threads)
.build()
.expect("failed to build low-priority rayon pool"),
);
Self {
low_priority_threadpool,
}
}
}

View File

@@ -17,7 +17,6 @@ use beacon_chain::{
store::{HotColdDB, ItemStore, StoreConfig}, store::{HotColdDB, ItemStore, StoreConfig},
}; };
use beacon_chain::{Kzg, LightClientProducerEvent}; use beacon_chain::{Kzg, LightClientProducerEvent};
use beacon_processor::rayon_manager::RayonManager;
use beacon_processor::{BeaconProcessor, BeaconProcessorChannels}; use beacon_processor::{BeaconProcessor, BeaconProcessorChannels};
use beacon_processor::{BeaconProcessorConfig, BeaconProcessorQueueLengths}; use beacon_processor::{BeaconProcessorConfig, BeaconProcessorQueueLengths};
use environment::RuntimeContext; use environment::RuntimeContext;
@@ -681,7 +680,6 @@ where
executor: beacon_processor_context.executor.clone(), executor: beacon_processor_context.executor.clone(),
current_workers: 0, current_workers: 0,
config: beacon_processor_config, config: beacon_processor_config,
rayon_manager: RayonManager::default(),
} }
.spawn_manager( .spawn_manager(
beacon_processor_channels.beacon_processor_rx, beacon_processor_channels.beacon_processor_rx,

View File

@@ -5,7 +5,6 @@ use beacon_chain::{
}; };
use beacon_processor::{ use beacon_processor::{
BeaconProcessor, BeaconProcessorChannels, BeaconProcessorConfig, BeaconProcessorQueueLengths, BeaconProcessor, BeaconProcessorChannels, BeaconProcessorConfig, BeaconProcessorQueueLengths,
rayon_manager::RayonManager,
}; };
use directory::DEFAULT_ROOT_DIR; use directory::DEFAULT_ROOT_DIR;
use eth2::{BeaconNodeHttpClient, Timeouts}; use eth2::{BeaconNodeHttpClient, Timeouts};
@@ -248,7 +247,6 @@ pub async fn create_api_server_with_config<T: BeaconChainTypes>(
executor: test_runtime.task_executor.clone(), executor: test_runtime.task_executor.clone(),
current_workers: 0, current_workers: 0,
config: beacon_processor_config, config: beacon_processor_config,
rayon_manager: RayonManager::default(),
} }
.spawn_manager( .spawn_manager(
beacon_processor_rx, beacon_processor_rx,

View File

@@ -17,7 +17,6 @@ use beacon_chain::test_utils::{
test_spec, test_spec,
}; };
use beacon_chain::{BeaconChain, WhenSlotSkipped}; use beacon_chain::{BeaconChain, WhenSlotSkipped};
use beacon_processor::rayon_manager::RayonManager;
use beacon_processor::{work_reprocessing_queue::*, *}; use beacon_processor::{work_reprocessing_queue::*, *};
use gossipsub::MessageAcceptance; use gossipsub::MessageAcceptance;
use itertools::Itertools; use itertools::Itertools;
@@ -267,7 +266,6 @@ impl TestRig {
executor, executor,
current_workers: 0, current_workers: 0,
config: beacon_processor_config, config: beacon_processor_config,
rayon_manager: RayonManager::default(),
} }
.spawn_manager( .spawn_manager(
beacon_processor_rx, beacon_processor_rx,

View File

@@ -8,6 +8,8 @@ edition = { workspace = true }
async-channel = { workspace = true } async-channel = { workspace = true }
futures = { workspace = true } futures = { workspace = true }
metrics = { workspace = true } metrics = { workspace = true }
num_cpus = { workspace = true }
rayon = { workspace = true }
tokio = { workspace = true, features = ["rt-multi-thread", "macros"] } tokio = { workspace = true, features = ["rt-multi-thread", "macros"] }
tracing = { workspace = true } tracing = { workspace = true }

View File

@@ -1,12 +1,15 @@
mod metrics; mod metrics;
mod rayon_pool_provider;
pub mod test_utils; pub mod test_utils;
use futures::channel::mpsc::Sender; use futures::channel::mpsc::Sender;
use futures::prelude::*; use futures::prelude::*;
use std::sync::Weak; use std::sync::{Arc, Weak};
use tokio::runtime::{Handle, Runtime}; use tokio::runtime::{Handle, Runtime};
use tracing::debug; use tracing::debug;
use crate::rayon_pool_provider::RayonPoolProvider;
pub use crate::rayon_pool_provider::RayonPoolType;
pub use tokio::task::JoinHandle; pub use tokio::task::JoinHandle;
/// Provides a reason when Lighthouse is shut down. /// Provides a reason when Lighthouse is shut down.
@@ -84,6 +87,8 @@ pub struct TaskExecutor {
// FIXME(sproul): delete? // FIXME(sproul): delete?
#[allow(dead_code)] #[allow(dead_code)]
service_name: String, service_name: String,
rayon_pool_provider: Arc<RayonPoolProvider>,
} }
impl TaskExecutor { impl TaskExecutor {
@@ -105,6 +110,7 @@ impl TaskExecutor {
exit, exit,
signal_tx, signal_tx,
service_name, service_name,
rayon_pool_provider: Arc::new(RayonPoolProvider::default()),
} }
} }
@@ -115,6 +121,7 @@ impl TaskExecutor {
exit: self.exit.clone(), exit: self.exit.clone(),
signal_tx: self.signal_tx.clone(), signal_tx: self.signal_tx.clone(),
service_name, service_name,
rayon_pool_provider: self.rayon_pool_provider.clone(),
} }
} }
@@ -226,6 +233,47 @@ impl TaskExecutor {
} }
} }
/// Spawns a blocking task on a dedicated tokio thread pool and installs a rayon context within it.
pub fn spawn_blocking_with_rayon<F>(
self,
task: F,
rayon_pool_type: RayonPoolType,
name: &'static str,
) where
F: FnOnce() + Send + 'static,
{
let thread_pool = self.rayon_pool_provider.get_thread_pool(rayon_pool_type);
self.spawn_blocking(
move || {
thread_pool.install(|| {
task();
});
},
name,
)
}
/// Spawns a blocking computation on a rayon thread pool and awaits the result.
pub async fn spawn_blocking_with_rayon_async<F, R>(
&self,
rayon_pool_type: RayonPoolType,
task: F,
) -> Result<R, tokio::sync::oneshot::error::RecvError>
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
let thread_pool = self.rayon_pool_provider.get_thread_pool(rayon_pool_type);
let (tx, rx) = tokio::sync::oneshot::channel();
thread_pool.spawn(move || {
let result = task();
let _ = tx.send(result);
});
rx.await
}
/// Spawn a future on the tokio runtime wrapped in an `async-channel::Receiver` returning an optional /// Spawn a future on the tokio runtime wrapped in an `async-channel::Receiver` returning an optional
/// join handle to the future. /// join handle to the future.
/// The task is cancelled when the corresponding async-channel is dropped. /// The task is cancelled when the corresponding async-channel is dropped.

View File

@@ -0,0 +1,58 @@
use rayon::{ThreadPool, ThreadPoolBuilder};
use std::sync::Arc;
const DEFAULT_LOW_PRIORITY_CPU_PERCENTAGE: usize = 25;
const DEFAULT_HIGH_PRIORITY_CPU_PERCENTAGE: usize = 80;
const MINIMUM_THREAD_COUNT: usize = 1;
pub enum RayonPoolType {
HighPriority,
LowPriority,
}
pub struct RayonPoolProvider {
/// Smaller rayon thread pool for lower-priority, compute-intensive tasks.
/// By default ~25% of CPUs or a minimum of 1 thread.
low_priority_thread_pool: Arc<ThreadPool>,
/// Larger rayon thread pool for high-priority, compute-intensive tasks.
/// By default ~80% of CPUs or a minimum of 1 thread. Citical/highest
/// priority tasks should use the global pool instead.
high_priority_thread_pool: Arc<ThreadPool>,
}
impl Default for RayonPoolProvider {
fn default() -> Self {
let low_prio_threads =
(num_cpus::get() * DEFAULT_LOW_PRIORITY_CPU_PERCENTAGE / 100).max(MINIMUM_THREAD_COUNT);
let low_priority_thread_pool = Arc::new(
ThreadPoolBuilder::new()
.num_threads(low_prio_threads)
.build()
.expect("failed to build low-priority rayon pool"),
);
let high_prio_threads = (num_cpus::get() * DEFAULT_HIGH_PRIORITY_CPU_PERCENTAGE / 100)
.max(MINIMUM_THREAD_COUNT);
let high_priority_thread_pool = Arc::new(
ThreadPoolBuilder::new()
.num_threads(high_prio_threads)
.build()
.expect("failed to build high-priority rayon pool"),
);
Self {
low_priority_thread_pool,
high_priority_thread_pool,
}
}
}
impl RayonPoolProvider {
/// Get a scoped thread pool by priority level.
/// For critical/highest priority tasks, use the global pool instead.
pub fn get_thread_pool(&self, rayon_pool_type: RayonPoolType) -> Arc<ThreadPool> {
match rayon_pool_type {
RayonPoolType::HighPriority => self.high_priority_thread_pool.clone(),
RayonPoolType::LowPriority => self.low_priority_thread_pool.clone(),
}
}
}