From 99f5a92b98b579b350cdef14a3f2e9007ede8bea Mon Sep 17 00:00:00 2001 From: Eitan Seri-Levi Date: Wed, 1 Apr 2026 11:13:20 +0900 Subject: [PATCH] Automatically pass spans into blocking handles (#8158) Co-Authored-By: Eitan Seri- Levi Co-Authored-By: Eitan Seri-Levi Co-Authored-By: Jimmy Chen --- beacon_node/beacon_chain/src/beacon_chain.rs | 46 +++---------------- .../src/block_production/gloas.rs | 17 ++----- .../beacon_chain/src/block_production/mod.rs | 1 + .../beacon_chain/src/canonical_head.rs | 12 +---- .../beacon_chain/src/fetch_blobs/mod.rs | 4 +- .../gossip_verified_envelope.rs | 4 +- .../payload_envelope_verification/import.rs | 4 -- beacon_node/http_api/src/publish_blocks.rs | 11 ++--- common/task_executor/src/lib.rs | 12 ++++- 9 files changed, 29 insertions(+), 82 deletions(-) diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 69db0c24fb..310163b4a9 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -130,7 +130,7 @@ use store::{ }; use task_executor::{RayonPoolType, ShutdownReason, TaskExecutor}; use tokio_stream::Stream; -use tracing::{Span, debug, debug_span, error, info, info_span, instrument, trace, warn}; +use tracing::{debug, debug_span, error, info, info_span, instrument, trace, warn}; use tree_hash::TreeHash; use types::data::{ColumnIndex, FixedBlobSidecarList}; use types::execution::BlockProductionVersion; @@ -2761,6 +2761,7 @@ impl BeaconChain { /// or already-known). /// /// This method is potentially long-running and should not run on the core executor. + #[instrument(skip_all, level = "debug")] pub fn filter_chain_segment( self: &Arc, chain_segment: Vec>, @@ -2888,12 +2889,8 @@ impl BeaconChain { // Filter uninteresting blocks from the chain segment in a blocking task. let chain = self.clone(); - let filter_chain_segment = debug_span!("filter_chain_segment"); let filtered_chain_segment_future = self.spawn_blocking_handle( - move || { - let _guard = filter_chain_segment.enter(); - chain.filter_chain_segment(chain_segment) - }, + move || chain.filter_chain_segment(chain_segment), "filter_chain_segment", ); let mut filtered_chain_segment = match filtered_chain_segment_future.await { @@ -2924,12 +2921,8 @@ impl BeaconChain { std::mem::swap(&mut blocks, &mut filtered_chain_segment); let chain = self.clone(); - let current_span = Span::current(); let signature_verification_future = self.spawn_blocking_handle( - move || { - let _guard = current_span.enter(); - signature_verify_chain_segment(blocks, &chain) - }, + move || signature_verify_chain_segment(blocks, &chain), "signature_verify_chain_segment", ); @@ -3019,12 +3012,10 @@ impl BeaconChain { block: Arc>, ) -> Result, BlockError> { let chain = self.clone(); - let span = Span::current(); self.task_executor .clone() .spawn_blocking_handle( move || { - let _guard = span.enter(); let slot = block.slot(); let graffiti_string = block.message().body().graffiti().as_utf8_lossy(); @@ -3332,11 +3323,9 @@ impl BeaconChain { let data_availability_checker = self.data_availability_checker.clone(); - let current_span = Span::current(); let result = self .task_executor .spawn_blocking_with_rayon_async(RayonPoolType::HighPriority, move || { - let _guard = current_span.enter(); data_availability_checker.reconstruct_data_columns(&block_root) }) .await @@ -3811,13 +3800,9 @@ impl BeaconChain { } let block_root = { - // Capture the current span before moving into the blocking task - let current_span = tracing::Span::current(); let chain = self.clone(); self.spawn_blocking_handle( move || { - // Enter the captured span in the blocking thread - let _guard = current_span.enter(); chain.import_block( block, block_root, @@ -4528,15 +4513,10 @@ impl BeaconChain { // // Load the parent state from disk. let chain = self.clone(); - let span = Span::current(); let (state, state_root_opt) = self .task_executor .spawn_blocking_handle( - move || { - let _guard = - debug_span!(parent: span, "load_state_for_block_production").entered(); - chain.load_state_for_block_production(slot) - }, + move || chain.load_state_for_block_production(slot), "load_state_for_block_production", ) .ok_or(BlockProductionError::ShuttingDown)? @@ -4960,13 +4940,10 @@ impl BeaconChain { .graffiti_calculator .get_graffiti(graffiti_settings) .await; - let span = Span::current(); let mut partial_beacon_block = self .task_executor .spawn_blocking_handle( move || { - let _guard = - debug_span!(parent: span, "produce_partial_beacon_block").entered(); chain.produce_partial_beacon_block( state, state_root_opt, @@ -5002,14 +4979,10 @@ impl BeaconChain { match block_contents_type { BlockProposalContentsType::Full(block_contents) => { let chain = self.clone(); - let span = Span::current(); let beacon_block_response = self .task_executor .spawn_blocking_handle( move || { - let _guard = - debug_span!(parent: span, "complete_partial_beacon_block") - .entered(); chain.complete_partial_beacon_block( partial_beacon_block, Some(block_contents), @@ -5026,14 +4999,10 @@ impl BeaconChain { } BlockProposalContentsType::Blinded(block_contents) => { let chain = self.clone(); - let span = Span::current(); let beacon_block_response = self .task_executor .spawn_blocking_handle( move || { - let _guard = - debug_span!(parent: span, "complete_partial_beacon_block") - .entered(); chain.complete_partial_beacon_block( partial_beacon_block, Some(block_contents), @@ -5051,13 +5020,10 @@ impl BeaconChain { } } else { let chain = self.clone(); - let span = Span::current(); let beacon_block_response = self .task_executor .spawn_blocking_handle( move || { - let _guard = - debug_span!(parent: span, "complete_partial_beacon_block").entered(); chain.complete_partial_beacon_block( partial_beacon_block, None, @@ -5075,6 +5041,7 @@ impl BeaconChain { } #[allow(clippy::too_many_arguments)] + #[instrument(skip_all, level = "debug")] fn produce_partial_beacon_block( self: &Arc, mut state: BeaconState, @@ -5319,6 +5286,7 @@ impl BeaconChain { }) } + #[instrument(skip_all, level = "debug")] fn complete_partial_beacon_block>( &self, partial_beacon_block: PartialBeaconBlock, diff --git a/beacon_node/beacon_chain/src/block_production/gloas.rs b/beacon_node/beacon_chain/src/block_production/gloas.rs index 2fc4fb51f7..51caf63b7a 100644 --- a/beacon_node/beacon_chain/src/block_production/gloas.rs +++ b/beacon_node/beacon_chain/src/block_production/gloas.rs @@ -19,7 +19,7 @@ use state_processing::{ }; use state_processing::{VerifyOperation, state_advance::complete_state_advance}; use task_executor::JoinHandle; -use tracing::{Instrument, Span, debug, debug_span, error, instrument, trace, warn}; +use tracing::{Instrument, debug, debug_span, error, instrument, trace, warn}; use tree_hash::TreeHash; use types::consts::gloas::BUILDER_INDEX_SELF_BUILD; use types::{ @@ -87,15 +87,10 @@ impl BeaconChain { // // Load the parent state from disk. let chain = self.clone(); - let span = Span::current(); let (state, state_root_opt) = self .task_executor .spawn_blocking_handle( - move || { - let _guard = - debug_span!(parent: span, "load_state_for_block_production").entered(); - chain.load_state_for_block_production(slot) - }, + move || chain.load_state_for_block_production(slot), "load_state_for_block_production", ) .ok_or(BlockProductionError::ShuttingDown)? @@ -135,13 +130,10 @@ impl BeaconChain { .graffiti_calculator .get_graffiti(graffiti_settings) .await; - let span = Span::current(); let (partial_beacon_block, state) = self .task_executor .spawn_blocking_handle( move || { - let _guard = - debug_span!(parent: span, "produce_partial_beacon_block_gloas").entered(); chain.produce_partial_beacon_block_gloas( state, state_root_opt, @@ -175,12 +167,9 @@ impl BeaconChain { // // Complete the block with the execution payload bid. let chain = self.clone(); - let span = Span::current(); self.task_executor .spawn_blocking_handle( move || { - let _guard = - debug_span!(parent: span, "complete_partial_beacon_block_gloas").entered(); chain.complete_partial_beacon_block_gloas( partial_beacon_block, execution_payload_bid, @@ -198,6 +187,7 @@ impl BeaconChain { #[allow(clippy::too_many_arguments)] #[allow(clippy::type_complexity)] + #[instrument(skip_all, level = "debug")] fn produce_partial_beacon_block_gloas( self: &Arc, mut state: BeaconState, @@ -432,6 +422,7 @@ impl BeaconChain { /// - `pending_state` is the state post block application (prior to payload application) /// - `block_value` is the consensus-layer rewards for `block` #[allow(clippy::type_complexity)] + #[instrument(skip_all, level = "debug")] fn complete_partial_beacon_block_gloas( &self, partial_beacon_block: PartialBeaconBlock, diff --git a/beacon_node/beacon_chain/src/block_production/mod.rs b/beacon_node/beacon_chain/src/block_production/mod.rs index b33323f527..256b67086a 100644 --- a/beacon_node/beacon_chain/src/block_production/mod.rs +++ b/beacon_node/beacon_chain/src/block_production/mod.rs @@ -15,6 +15,7 @@ mod gloas; impl BeaconChain { /// Load a beacon state from the database for block production. This is a long-running process /// that should not be performed in an `async` context. + #[instrument(skip_all, level = "debug")] pub(crate) fn load_state_for_block_production( self: &Arc, slot: Slot, diff --git a/beacon_node/beacon_chain/src/canonical_head.rs b/beacon_node/beacon_chain/src/canonical_head.rs index 9dd7d62a27..f6377e6ea5 100644 --- a/beacon_node/beacon_chain/src/canonical_head.rs +++ b/beacon_node/beacon_chain/src/canonical_head.rs @@ -58,7 +58,6 @@ use store::{ Error as StoreError, KeyValueStore, KeyValueStoreOp, StoreConfig, iter::StateRootsIterator, }; use task_executor::{JoinHandle, ShutdownReason}; -use tracing::info_span; use tracing::{debug, error, info, instrument, warn}; use types::*; @@ -528,22 +527,15 @@ impl BeaconChain { /// such a case it's critical that the `BeaconChain` keeps importing blocks so that the /// situation can be rectified. We avoid returning an error here so that calling functions /// can't abort block import because an error is returned here. + #[instrument(name = "lh_recompute_head_at_slot", skip(self), level = "info", fields(slot = %current_slot))] pub async fn recompute_head_at_slot(self: &Arc, current_slot: Slot) { - let span = info_span!( - "lh_recompute_head_at_slot", - slot = %current_slot - ); - metrics::inc_counter(&metrics::FORK_CHOICE_REQUESTS); let _timer = metrics::start_timer(&metrics::FORK_CHOICE_TIMES); let chain = self.clone(); match self .spawn_blocking_handle( - move || { - let _guard = span.enter(); - chain.recompute_head_at_slot_internal(current_slot) - }, + move || chain.recompute_head_at_slot_internal(current_slot), "recompute_head_internal", ) .await diff --git a/beacon_node/beacon_chain/src/fetch_blobs/mod.rs b/beacon_node/beacon_chain/src/fetch_blobs/mod.rs index bae61767cc..db76ff887d 100644 --- a/beacon_node/beacon_chain/src/fetch_blobs/mod.rs +++ b/beacon_node/beacon_chain/src/fetch_blobs/mod.rs @@ -32,7 +32,7 @@ use mockall_double::double; use ssz_types::FixedVector; use state_processing::per_block_processing::deneb::kzg_commitment_to_versioned_hash; use std::sync::Arc; -use tracing::{Span, debug, instrument, warn}; +use tracing::{debug, instrument, warn}; use types::data::{BlobSidecarError, DataColumnSidecarError}; use types::{ BeaconStateError, Blob, BlobSidecar, ColumnIndex, EthSpec, FullPayload, Hash256, KzgProofs, @@ -356,12 +356,10 @@ async fn compute_custody_columns_to_import( let spec = chain_adapter.spec().clone(); let chain_adapter_cloned = chain_adapter.clone(); let custody_columns_indices = custody_columns_indices.to_vec(); - let current_span = Span::current(); chain_adapter .executor() .spawn_blocking_handle( move || { - let _guard = current_span.enter(); let mut timer = metrics::start_timer_vec( &metrics::DATA_COLUMN_SIDECAR_COMPUTATION, &[&blobs.len().to_string()], diff --git a/beacon_node/beacon_chain/src/payload_envelope_verification/gossip_verified_envelope.rs b/beacon_node/beacon_chain/src/payload_envelope_verification/gossip_verified_envelope.rs index 9a4ed2d044..4d40a29332 100644 --- a/beacon_node/beacon_chain/src/payload_envelope_verification/gossip_verified_envelope.rs +++ b/beacon_node/beacon_chain/src/payload_envelope_verification/gossip_verified_envelope.rs @@ -4,7 +4,7 @@ use educe::Educe; use eth2::types::{EventKind, SseExecutionPayloadGossip}; use parking_lot::{Mutex, RwLock}; use store::DatabaseBlock; -use tracing::{Span, debug}; +use tracing::debug; use types::{ ChainSpec, EthSpec, ExecutionPayloadBid, ExecutionPayloadEnvelope, Hash256, SignedBeaconBlock, SignedExecutionPayloadEnvelope, Slot, consts::gloas::BUILDER_INDEX_SELF_BUILD, @@ -270,12 +270,10 @@ impl BeaconChain { envelope: Arc>, ) -> Result, EnvelopeError> { let chain = self.clone(); - let span = Span::current(); self.task_executor .clone() .spawn_blocking_handle( move || { - let _guard = span.enter(); let slot = envelope.slot(); let beacon_block_root = envelope.message.beacon_block_root; diff --git a/beacon_node/beacon_chain/src/payload_envelope_verification/import.rs b/beacon_node/beacon_chain/src/payload_envelope_verification/import.rs index bae848c3c1..39925d65d2 100644 --- a/beacon_node/beacon_chain/src/payload_envelope_verification/import.rs +++ b/beacon_node/beacon_chain/src/payload_envelope_verification/import.rs @@ -192,13 +192,9 @@ impl BeaconChain { } = import_data; let block_root = { - // Capture the current span before moving into the blocking task - let current_span = tracing::Span::current(); let chain = self.clone(); self.spawn_blocking_handle( move || { - // Enter the captured span in the blocking thread - let _guard = current_span.enter(); chain.import_execution_payload_envelope( envelope, block_root, diff --git a/beacon_node/http_api/src/publish_blocks.rs b/beacon_node/http_api/src/publish_blocks.rs index 43dfbeb836..eb7e56e9cc 100644 --- a/beacon_node/http_api/src/publish_blocks.rs +++ b/beacon_node/http_api/src/publish_blocks.rs @@ -146,12 +146,8 @@ pub async fn publish_block>( let slot = block.message().slot(); let sender_clone = network_tx.clone(); - let build_sidecar_task_handle = spawn_build_data_sidecar_task( - chain.clone(), - block.clone(), - unverified_blobs, - current_span.clone(), - )?; + let build_sidecar_task_handle = + spawn_build_data_sidecar_task(chain.clone(), block.clone(), unverified_blobs)?; // Gossip verify the block and blobs/data columns separately. let gossip_verified_block_result = unverified_block.into_gossip_verified_block(&chain); @@ -358,7 +354,6 @@ fn spawn_build_data_sidecar_task( chain: Arc>, block: Arc>>, proofs_and_blobs: UnverifiedBlobs, - current_span: Span, ) -> Result>, Rejection> { chain .clone() @@ -368,7 +363,7 @@ fn spawn_build_data_sidecar_task( let Some((kzg_proofs, blobs)) = proofs_and_blobs else { return Ok((vec![], vec![])); }; - let _guard = debug_span!(parent: current_span, "build_data_sidecars").entered(); + let _span = debug_span!("build_data_sidecars").entered(); let peer_das_enabled = chain.spec.is_peer_das_enabled_for_epoch(block.epoch()); if !peer_das_enabled { diff --git a/common/task_executor/src/lib.rs b/common/task_executor/src/lib.rs index d3d862f96c..07716fa2e7 100644 --- a/common/task_executor/src/lib.rs +++ b/common/task_executor/src/lib.rs @@ -6,7 +6,7 @@ use futures::channel::mpsc::Sender; use futures::prelude::*; use std::sync::{Arc, Weak}; use tokio::runtime::{Handle, Runtime}; -use tracing::debug; +use tracing::{Span, debug}; use crate::rayon_pool_provider::RayonPoolProvider; pub use crate::rayon_pool_provider::RayonPoolType; @@ -225,9 +225,11 @@ impl TaskExecutor { F: FnOnce() + Send + 'static, { let thread_pool = self.rayon_pool_provider.get_thread_pool(rayon_pool_type); + let span = Span::current(); self.spawn_blocking( move || { thread_pool.install(|| { + let _guard = span.enter(); task(); }); }, @@ -247,8 +249,10 @@ impl TaskExecutor { { let thread_pool = self.rayon_pool_provider.get_thread_pool(rayon_pool_type); let (tx, rx) = tokio::sync::oneshot::channel(); + let span = Span::current(); thread_pool.spawn(move || { + let _guard = span.enter(); let result = task(); let _ = tx.send(result); }); @@ -320,8 +324,12 @@ impl TaskExecutor { let timer = metrics::start_timer_vec(&metrics::BLOCKING_TASKS_HISTOGRAM, &[name]); metrics::inc_gauge_vec(&metrics::BLOCKING_TASKS_COUNT, &[name]); + let span = Span::current(); let join_handle = if let Some(handle) = self.handle() { - handle.spawn_blocking(task) + handle.spawn_blocking(move || { + let _guard = span.enter(); + task() + }) } else { debug!("Couldn't spawn task. Runtime shutting down"); return None;