diff --git a/Cargo.lock b/Cargo.lock index ba6a4587b6..0e55918243 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -980,6 +980,7 @@ dependencies = [ "metrics", "num_cpus", "parking_lot 0.12.3", + "rayon", "serde", "slot_clock", "strum", diff --git a/beacon_node/beacon_processor/Cargo.toml b/beacon_node/beacon_processor/Cargo.toml index afd4660c9a..262badf7f9 100644 --- a/beacon_node/beacon_processor/Cargo.toml +++ b/beacon_node/beacon_processor/Cargo.toml @@ -12,6 +12,7 @@ logging = { workspace = true } metrics = { workspace = true } num_cpus = { workspace = true } parking_lot = { workspace = true } +rayon = { workspace = true } serde = { workspace = true } slot_clock = { workspace = true } strum = { workspace = true } diff --git a/beacon_node/beacon_processor/src/lib.rs b/beacon_node/beacon_processor/src/lib.rs index 84723fb6a0..64aeb4ceaf 100644 --- a/beacon_node/beacon_processor/src/lib.rs +++ b/beacon_node/beacon_processor/src/lib.rs @@ -38,6 +38,7 @@ //! checks the queues to see if there are more parcels of work that can be spawned in a new worker //! task. +use crate::rayon_manager::RayonManager; use crate::work_reprocessing_queue::{ QueuedBackfillBatch, QueuedColumnReconstruction, QueuedGossipBlock, ReprocessQueueMessage, }; @@ -47,6 +48,7 @@ use lighthouse_network::{MessageId, NetworkGlobals, PeerId}; use logging::TimeLatch; use logging::crit; use parking_lot::Mutex; +use rayon::ThreadPool; pub use scheduler::work_reprocessing_queue; use serde::{Deserialize, Serialize}; use slot_clock::SlotClock; @@ -74,6 +76,7 @@ use work_reprocessing_queue::{ }; mod metrics; +pub mod rayon_manager; pub mod scheduler; /// The maximum size of the channel for work events to the `BeaconProcessor`. @@ -603,7 +606,7 @@ pub enum Work { process_fn: BlockingFn, }, ChainSegment(AsyncFn), - ChainSegmentBackfill(AsyncFn), + ChainSegmentBackfill(BlockingFn), Status(BlockingFn), BlocksByRangeRequest(AsyncFn), BlocksByRootsRequest(AsyncFn), @@ -807,6 +810,7 @@ pub struct BeaconProcessor { pub network_globals: Arc>, pub executor: TaskExecutor, pub current_workers: usize, + pub rayon_manager: RayonManager, pub config: BeaconProcessorConfig, } @@ -1603,7 +1607,17 @@ impl BeaconProcessor { Work::BlocksByRangeRequest(work) | Work::BlocksByRootsRequest(work) => { task_spawner.spawn_async(work) } - Work::ChainSegmentBackfill(process_fn) => task_spawner.spawn_async(process_fn), + Work::ChainSegmentBackfill(process_fn) => { + if self.config.enable_backfill_rate_limiting { + task_spawner.spawn_blocking_with_rayon( + self.rayon_manager.low_priority_threadpool.clone(), + process_fn, + ) + } else { + // use the global rayon thread pool if backfill rate limiting is disabled. + task_spawner.spawn_blocking(process_fn) + } + } Work::ApiRequestP0(process_fn) | Work::ApiRequestP1(process_fn) => match process_fn { BlockingOrAsync::Blocking(process_fn) => task_spawner.spawn_blocking(process_fn), BlockingOrAsync::Async(process_fn) => task_spawner.spawn_async(process_fn), @@ -1665,6 +1679,22 @@ impl TaskSpawner { WORKER_TASK_NAME, ) } + + /// Spawns a blocking task on a rayon thread pool, dropping the `SendOnDrop` after task completion. + fn spawn_blocking_with_rayon(self, thread_pool: Arc, task: F) + where + F: FnOnce() + Send + 'static, + { + self.executor.spawn_blocking( + move || { + thread_pool.install(|| { + task(); + }); + drop(self.send_idle_on_drop) + }, + WORKER_TASK_NAME, + ) + } } /// This struct will send a message on `self.tx` when it is dropped. An error will be logged diff --git a/beacon_node/beacon_processor/src/rayon_manager.rs b/beacon_node/beacon_processor/src/rayon_manager.rs new file mode 100644 index 0000000000..99fe32d5cc --- /dev/null +++ b/beacon_node/beacon_processor/src/rayon_manager.rs @@ -0,0 +1,27 @@ +use rayon::{ThreadPool, ThreadPoolBuilder}; +use std::sync::Arc; + +const DEFAULT_LOW_PRIORITY_DIVISOR: usize = 4; +const MINIMUM_LOW_PRIORITY_THREAD_COUNT: usize = 1; + +pub struct RayonManager { + /// Smaller rayon thread pool for lower-priority, compute-intensive tasks. + /// By default ~25% of CPUs or a minimum of 1 thread. + pub low_priority_threadpool: Arc, +} + +impl Default for RayonManager { + fn default() -> Self { + let low_prio_threads = + (num_cpus::get() / DEFAULT_LOW_PRIORITY_DIVISOR).max(MINIMUM_LOW_PRIORITY_THREAD_COUNT); + let low_priority_threadpool = Arc::new( + ThreadPoolBuilder::new() + .num_threads(low_prio_threads) + .build() + .expect("failed to build low-priority rayon pool"), + ); + Self { + low_priority_threadpool, + } + } +} diff --git a/beacon_node/beacon_processor/src/scheduler/work_reprocessing_queue.rs b/beacon_node/beacon_processor/src/scheduler/work_reprocessing_queue.rs index 3e755f0830..8c33cf5869 100644 --- a/beacon_node/beacon_processor/src/scheduler/work_reprocessing_queue.rs +++ b/beacon_node/beacon_processor/src/scheduler/work_reprocessing_queue.rs @@ -173,7 +173,7 @@ pub struct IgnoredRpcBlock { } /// A backfill batch work that has been queued for processing later. -pub struct QueuedBackfillBatch(pub AsyncFn); +pub struct QueuedBackfillBatch(pub BlockingFn); pub struct QueuedColumnReconstruction { pub block_root: Hash256, @@ -1084,7 +1084,7 @@ mod tests { // Now queue a backfill sync batch. work_reprocessing_tx .try_send(ReprocessQueueMessage::BackfillSync(QueuedBackfillBatch( - Box::pin(async {}), + Box::new(|| {}), ))) .unwrap(); tokio::task::yield_now().await; diff --git a/beacon_node/client/src/builder.rs b/beacon_node/client/src/builder.rs index d984d5fedc..87cdcc45ef 100644 --- a/beacon_node/client/src/builder.rs +++ b/beacon_node/client/src/builder.rs @@ -17,6 +17,7 @@ use beacon_chain::{ store::{HotColdDB, ItemStore, StoreConfig}, }; use beacon_chain::{Kzg, LightClientProducerEvent}; +use beacon_processor::rayon_manager::RayonManager; use beacon_processor::{BeaconProcessor, BeaconProcessorChannels}; use beacon_processor::{BeaconProcessorConfig, BeaconProcessorQueueLengths}; use environment::RuntimeContext; @@ -680,6 +681,7 @@ where executor: beacon_processor_context.executor.clone(), current_workers: 0, config: beacon_processor_config, + rayon_manager: RayonManager::default(), } .spawn_manager( beacon_processor_channels.beacon_processor_rx, diff --git a/beacon_node/http_api/src/test_utils.rs b/beacon_node/http_api/src/test_utils.rs index fe9e0dff70..7be8960e69 100644 --- a/beacon_node/http_api/src/test_utils.rs +++ b/beacon_node/http_api/src/test_utils.rs @@ -5,6 +5,7 @@ use beacon_chain::{ }; use beacon_processor::{ BeaconProcessor, BeaconProcessorChannels, BeaconProcessorConfig, BeaconProcessorQueueLengths, + rayon_manager::RayonManager, }; use directory::DEFAULT_ROOT_DIR; use eth2::{BeaconNodeHttpClient, Timeouts}; @@ -247,6 +248,7 @@ pub async fn create_api_server_with_config( executor: test_runtime.task_executor.clone(), current_workers: 0, config: beacon_processor_config, + rayon_manager: RayonManager::default(), } .spawn_manager( beacon_processor_rx, diff --git a/beacon_node/lighthouse_tracing/src/lib.rs b/beacon_node/lighthouse_tracing/src/lib.rs index 60fda12cc2..18a9874252 100644 --- a/beacon_node/lighthouse_tracing/src/lib.rs +++ b/beacon_node/lighthouse_tracing/src/lib.rs @@ -26,6 +26,7 @@ pub const SPAN_PROCESS_RPC_BLOCK: &str = "process_rpc_block"; pub const SPAN_PROCESS_RPC_BLOBS: &str = "process_rpc_blobs"; pub const SPAN_PROCESS_RPC_CUSTODY_COLUMNS: &str = "process_rpc_custody_columns"; pub const SPAN_PROCESS_CHAIN_SEGMENT: &str = "process_chain_segment"; +pub const SPAN_PROCESS_CHAIN_SEGMENT_BACKFILL: &str = "process_chain_segment_backfill"; /// Fork choice root spans pub const SPAN_RECOMPUTE_HEAD: &str = "recompute_head_at_slot"; @@ -61,6 +62,7 @@ pub const LH_BN_ROOT_SPAN_NAMES: &[&str] = &[ SPAN_PROCESS_RPC_BLOBS, SPAN_PROCESS_RPC_CUSTODY_COLUMNS, SPAN_PROCESS_CHAIN_SEGMENT, + SPAN_PROCESS_CHAIN_SEGMENT_BACKFILL, SPAN_HANDLE_BLOCKS_BY_RANGE_REQUEST, SPAN_HANDLE_BLOBS_BY_RANGE_REQUEST, SPAN_HANDLE_DATA_COLUMNS_BY_RANGE_REQUEST, diff --git a/beacon_node/network/src/network_beacon_processor/mod.rs b/beacon_node/network/src/network_beacon_processor/mod.rs index 691c06f268..85ccde1d59 100644 --- a/beacon_node/network/src/network_beacon_processor/mod.rs +++ b/beacon_node/network/src/network_beacon_processor/mod.rs @@ -6,9 +6,7 @@ use beacon_chain::data_column_verification::{GossipDataColumnError, observe_goss use beacon_chain::fetch_blobs::{ EngineGetBlobsOutput, FetchEngineBlobError, fetch_and_process_engine_blobs, }; -use beacon_chain::{ - AvailabilityProcessingStatus, BeaconChain, BeaconChainTypes, BlockError, NotifyExecutionLayer, -}; +use beacon_chain::{AvailabilityProcessingStatus, BeaconChain, BeaconChainTypes, BlockError}; use beacon_processor::{ BeaconProcessorSend, DuplicateCache, GossipAggregatePackage, GossipAttestationPackage, Work, WorkEvent as BeaconWorkEvent, @@ -500,33 +498,23 @@ impl NetworkBeaconProcessor { process_id: ChainSegmentProcessId, blocks: Vec>, ) -> Result<(), Error> { - let is_backfill = matches!(&process_id, ChainSegmentProcessId::BackSyncBatchId { .. }); debug!(blocks = blocks.len(), id = ?process_id, "Batch sending for process"); - let processor = self.clone(); - let process_fn = async move { - let notify_execution_layer = if processor - .network_globals - .sync_state - .read() - .is_syncing_finalized() - { - NotifyExecutionLayer::No - } else { - NotifyExecutionLayer::Yes - }; - processor - .process_chain_segment(process_id, blocks, notify_execution_layer) - .await; - }; - let process_fn = Box::pin(process_fn); // Back-sync batches are dispatched with a different `Work` variant so // they can be rate-limited. - let work = if is_backfill { - Work::ChainSegmentBackfill(process_fn) - } else { - Work::ChainSegment(process_fn) + let work = match process_id { + ChainSegmentProcessId::RangeBatchId(_, _) => { + let process_fn = async move { + processor.process_chain_segment(process_id, blocks).await; + }; + Work::ChainSegment(Box::pin(process_fn)) + } + ChainSegmentProcessId::BackSyncBatchId(_) => { + let process_fn = + move || processor.process_chain_segment_backfill(process_id, blocks); + Work::ChainSegmentBackfill(Box::new(process_fn)) + } }; self.try_send(BeaconWorkEvent { diff --git a/beacon_node/network/src/network_beacon_processor/sync_methods.rs b/beacon_node/network/src/network_beacon_processor/sync_methods.rs index edeed7e98c..b61a6e25c5 100644 --- a/beacon_node/network/src/network_beacon_processor/sync_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/sync_methods.rs @@ -19,9 +19,10 @@ use beacon_processor::{ use beacon_processor::{Work, WorkEvent}; use lighthouse_network::PeerAction; use lighthouse_tracing::{ - SPAN_PROCESS_CHAIN_SEGMENT, SPAN_PROCESS_RPC_BLOBS, SPAN_PROCESS_RPC_BLOCK, - SPAN_PROCESS_RPC_CUSTODY_COLUMNS, + SPAN_PROCESS_CHAIN_SEGMENT, SPAN_PROCESS_CHAIN_SEGMENT_BACKFILL, SPAN_PROCESS_RPC_BLOBS, + SPAN_PROCESS_RPC_BLOCK, SPAN_PROCESS_RPC_CUSTODY_COLUMNS, }; +use logging::crit; use std::sync::Arc; use std::time::Duration; use store::KzgCommitment; @@ -434,27 +435,42 @@ impl NetworkBeaconProcessor { parent = None, level = "debug", skip_all, - fields(sync_type = ?sync_type, downloaded_blocks = downloaded_blocks.len()) + fields(process_id = ?process_id, downloaded_blocks = downloaded_blocks.len()) )] pub async fn process_chain_segment( &self, - sync_type: ChainSegmentProcessId, + process_id: ChainSegmentProcessId, downloaded_blocks: Vec>, - notify_execution_layer: NotifyExecutionLayer, ) { - let result = match sync_type { - // this a request from the range sync - ChainSegmentProcessId::RangeBatchId(chain_id, epoch) => { - let start_slot = downloaded_blocks.first().map(|b| b.slot().as_u64()); - let end_slot = downloaded_blocks.last().map(|b| b.slot().as_u64()); - let sent_blocks = downloaded_blocks.len(); + let ChainSegmentProcessId::RangeBatchId(chain_id, epoch) = process_id else { + // This is a request from range sync, this should _never_ happen + crit!( + error = "process_chain_segment called on a variant other than RangeBatchId", + "Please notify the devs" + ); + return; + }; - match self - .process_blocks(downloaded_blocks.iter(), notify_execution_layer) - .await - { - (imported_blocks, Ok(_)) => { - debug!( + let start_slot = downloaded_blocks.first().map(|b| b.slot().as_u64()); + let end_slot = downloaded_blocks.last().map(|b| b.slot().as_u64()); + let sent_blocks = downloaded_blocks.len(); + let notify_execution_layer = if self + .network_globals + .sync_state + .read() + .is_syncing_finalized() + { + NotifyExecutionLayer::No + } else { + NotifyExecutionLayer::Yes + }; + + let result = match self + .process_blocks(downloaded_blocks.iter(), notify_execution_layer) + .await + { + (imported_blocks, Ok(_)) => { + debug!( batch_epoch = %epoch, first_block_slot = start_slot, chain = chain_id, @@ -462,13 +478,13 @@ impl NetworkBeaconProcessor { processed_blocks = sent_blocks, service= "sync", "Batch processed"); - BatchProcessResult::Success { - sent_blocks, - imported_blocks, - } - } - (imported_blocks, Err(e)) => { - debug!( + BatchProcessResult::Success { + sent_blocks, + imported_blocks, + } + } + (imported_blocks, Err(e)) => { + debug!( batch_epoch = %epoch, first_block_slot = start_slot, chain = chain_id, @@ -477,33 +493,61 @@ impl NetworkBeaconProcessor { error = %e.message, service = "sync", "Batch processing failed"); - match e.peer_action { - Some(penalty) => BatchProcessResult::FaultyFailure { - imported_blocks, - penalty, - }, - None => BatchProcessResult::NonFaultyFailure, - } - } + match e.peer_action { + Some(penalty) => BatchProcessResult::FaultyFailure { + imported_blocks, + penalty, + }, + None => BatchProcessResult::NonFaultyFailure, } } - // this a request from the Backfill sync - ChainSegmentProcessId::BackSyncBatchId(epoch) => { - let start_slot = downloaded_blocks.first().map(|b| b.slot().as_u64()); - let end_slot = downloaded_blocks.last().map(|b| b.slot().as_u64()); - let sent_blocks = downloaded_blocks.len(); - let n_blobs = downloaded_blocks - .iter() - .map(|wrapped| wrapped.n_blobs()) - .sum::(); - let n_data_columns = downloaded_blocks - .iter() - .map(|wrapped| wrapped.n_data_columns()) - .sum::(); + }; - match self.process_backfill_blocks(downloaded_blocks) { - (imported_blocks, Ok(_)) => { - debug!( + self.send_sync_message(SyncMessage::BatchProcessed { + sync_type: process_id, + result, + }); + } + + /// Attempt to import the chain segment (`blocks`) to the beacon chain, informing the sync + /// thread if more blocks are needed to process it. + #[instrument( + name = SPAN_PROCESS_CHAIN_SEGMENT_BACKFILL, + parent = None, + level = "debug", + skip_all, + fields(downloaded_blocks = downloaded_blocks.len()) + )] + pub fn process_chain_segment_backfill( + &self, + process_id: ChainSegmentProcessId, + downloaded_blocks: Vec>, + ) { + let ChainSegmentProcessId::BackSyncBatchId(epoch) = process_id else { + // this a request from RangeSync, this should _never_ happen + crit!( + error = + "process_chain_segment_backfill called on a variant other than BackSyncBatchId", + "Please notify the devs" + ); + return; + }; + + let start_slot = downloaded_blocks.first().map(|b| b.slot().as_u64()); + let end_slot = downloaded_blocks.last().map(|b| b.slot().as_u64()); + let sent_blocks = downloaded_blocks.len(); + let n_blobs = downloaded_blocks + .iter() + .map(|wrapped| wrapped.n_blobs()) + .sum::(); + let n_data_columns = downloaded_blocks + .iter() + .map(|wrapped| wrapped.n_data_columns()) + .sum::(); + + let result = match self.process_backfill_blocks(downloaded_blocks) { + (imported_blocks, Ok(_)) => { + debug!( batch_epoch = %epoch, first_block_slot = start_slot, keep_execution_payload = !self.chain.store.get_config().prune_payloads, @@ -513,34 +557,35 @@ impl NetworkBeaconProcessor { processed_data_columns = n_data_columns, service= "sync", "Backfill batch processed"); - BatchProcessResult::Success { - sent_blocks, - imported_blocks, - } - } - (_, Err(e)) => { - debug!( - batch_epoch = %epoch, - first_block_slot = start_slot, - last_block_slot = end_slot, - processed_blobs = n_blobs, - error = %e.message, - service = "sync", - "Backfill batch processing failed" - ); - match e.peer_action { - Some(penalty) => BatchProcessResult::FaultyFailure { - imported_blocks: 0, - penalty, - }, - None => BatchProcessResult::NonFaultyFailure, - } - } + BatchProcessResult::Success { + sent_blocks, + imported_blocks, + } + } + (_, Err(e)) => { + debug!( + batch_epoch = %epoch, + first_block_slot = start_slot, + last_block_slot = end_slot, + processed_blobs = n_blobs, + error = %e.message, + service = "sync", + "Backfill batch processing failed" + ); + match e.peer_action { + Some(penalty) => BatchProcessResult::FaultyFailure { + imported_blocks: 0, + penalty, + }, + None => BatchProcessResult::NonFaultyFailure, } } }; - self.send_sync_message(SyncMessage::BatchProcessed { sync_type, result }); + self.send_sync_message(SyncMessage::BatchProcessed { + sync_type: process_id, + result, + }); } /// Helper function to process blocks batches which only consumes the chain and blocks to process. diff --git a/beacon_node/network/src/network_beacon_processor/tests.rs b/beacon_node/network/src/network_beacon_processor/tests.rs index d3a93d4863..99410bc5e5 100644 --- a/beacon_node/network/src/network_beacon_processor/tests.rs +++ b/beacon_node/network/src/network_beacon_processor/tests.rs @@ -17,6 +17,7 @@ use beacon_chain::test_utils::{ test_spec, }; use beacon_chain::{BeaconChain, WhenSlotSkipped}; +use beacon_processor::rayon_manager::RayonManager; use beacon_processor::{work_reprocessing_queue::*, *}; use gossipsub::MessageAcceptance; use itertools::Itertools; @@ -266,6 +267,7 @@ impl TestRig { executor, current_workers: 0, config: beacon_processor_config, + rayon_manager: RayonManager::default(), } .spawn_manager( beacon_processor_rx, @@ -458,10 +460,10 @@ impl TestRig { .unwrap(); } - pub fn enqueue_backfill_batch(&self) { + pub fn enqueue_backfill_batch(&self, epoch: Epoch) { self.network_beacon_processor .send_chain_segment( - ChainSegmentProcessId::BackSyncBatchId(Epoch::default()), + ChainSegmentProcessId::BackSyncBatchId(epoch), Vec::default(), ) .unwrap(); @@ -606,7 +608,7 @@ impl TestRig { } pub async fn assert_event_journal(&mut self, expected: &[&str]) { - self.assert_event_journal_with_timeout(expected, STANDARD_TIMEOUT) + self.assert_event_journal_with_timeout(expected, STANDARD_TIMEOUT, false, false) .await } @@ -623,6 +625,8 @@ impl TestRig { .chain(std::iter::once(NOTHING_TO_DO)) .collect::>(), timeout, + false, + false, ) .await } @@ -666,11 +670,21 @@ impl TestRig { &mut self, expected: &[&str], timeout: Duration, + ignore_worker_freed: bool, + ignore_nothing_to_do: bool, ) { let mut events = Vec::with_capacity(expected.len()); let drain_future = async { while let Some(event) = self.work_journal_rx.recv().await { + if event == WORKER_FREED && ignore_worker_freed { + continue; + } + + if event == NOTHING_TO_DO && ignore_nothing_to_do { + continue; + } + events.push(event); // Break as soon as we collect the desired number of events. @@ -1384,6 +1398,8 @@ async fn requeue_unknown_block_gossip_attestation_without_import() { NOTHING_TO_DO, ], Duration::from_secs(1) + QUEUED_ATTESTATION_DELAY, + false, + false, ) .await; @@ -1424,6 +1440,8 @@ async fn requeue_unknown_block_gossip_aggregated_attestation_without_import() { NOTHING_TO_DO, ], Duration::from_secs(1) + QUEUED_ATTESTATION_DELAY, + false, + false, ) .await; @@ -1558,8 +1576,8 @@ async fn test_backfill_sync_processing() { // (not straight forward to manipulate `TestingSlotClock` due to cloning of `SlotClock` in code) // and makes the test very slow, hence timing calculation is unit tested separately in // `work_reprocessing_queue`. - for _ in 0..1 { - rig.enqueue_backfill_batch(); + for i in 0..1 { + rig.enqueue_backfill_batch(Epoch::new(i)); // ensure queued batch is not processed until later rig.assert_no_events_for(Duration::from_millis(100)).await; // A new batch should be processed within a slot. @@ -1570,6 +1588,8 @@ async fn test_backfill_sync_processing() { NOTHING_TO_DO, ], rig.chain.slot_clock.slot_duration(), + false, + false, ) .await; } @@ -1590,8 +1610,8 @@ async fn test_backfill_sync_processing_rate_limiting_disabled() { ) .await; - for _ in 0..3 { - rig.enqueue_backfill_batch(); + for i in 0..3 { + rig.enqueue_backfill_batch(Epoch::new(i)); } // ensure all batches are processed @@ -1602,6 +1622,8 @@ async fn test_backfill_sync_processing_rate_limiting_disabled() { WorkType::ChainSegmentBackfill.into(), ], Duration::from_millis(100), + true, + true, ) .await; }