mirror of
https://github.com/sigp/lighthouse.git
synced 2026-04-27 01:33:33 +00:00
Automatically pass spans into blocking handles (#8158)
Co-Authored-By: Eitan Seri- Levi <eserilev@gmail.com> Co-Authored-By: Eitan Seri-Levi <eserilev@ucsc.edu> Co-Authored-By: Jimmy Chen <jchen.tc@gmail.com>
This commit is contained in:
@@ -130,7 +130,7 @@ use store::{
|
||||
};
|
||||
use task_executor::{RayonPoolType, ShutdownReason, TaskExecutor};
|
||||
use tokio_stream::Stream;
|
||||
use tracing::{Span, debug, debug_span, error, info, info_span, instrument, trace, warn};
|
||||
use tracing::{debug, debug_span, error, info, info_span, instrument, trace, warn};
|
||||
use tree_hash::TreeHash;
|
||||
use types::data::{ColumnIndex, FixedBlobSidecarList};
|
||||
use types::execution::BlockProductionVersion;
|
||||
@@ -2761,6 +2761,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
/// or already-known).
|
||||
///
|
||||
/// This method is potentially long-running and should not run on the core executor.
|
||||
#[instrument(skip_all, level = "debug")]
|
||||
pub fn filter_chain_segment(
|
||||
self: &Arc<Self>,
|
||||
chain_segment: Vec<RangeSyncBlock<T::EthSpec>>,
|
||||
@@ -2888,12 +2889,8 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
|
||||
// Filter uninteresting blocks from the chain segment in a blocking task.
|
||||
let chain = self.clone();
|
||||
let filter_chain_segment = debug_span!("filter_chain_segment");
|
||||
let filtered_chain_segment_future = self.spawn_blocking_handle(
|
||||
move || {
|
||||
let _guard = filter_chain_segment.enter();
|
||||
chain.filter_chain_segment(chain_segment)
|
||||
},
|
||||
move || chain.filter_chain_segment(chain_segment),
|
||||
"filter_chain_segment",
|
||||
);
|
||||
let mut filtered_chain_segment = match filtered_chain_segment_future.await {
|
||||
@@ -2924,12 +2921,8 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
std::mem::swap(&mut blocks, &mut filtered_chain_segment);
|
||||
|
||||
let chain = self.clone();
|
||||
let current_span = Span::current();
|
||||
let signature_verification_future = self.spawn_blocking_handle(
|
||||
move || {
|
||||
let _guard = current_span.enter();
|
||||
signature_verify_chain_segment(blocks, &chain)
|
||||
},
|
||||
move || signature_verify_chain_segment(blocks, &chain),
|
||||
"signature_verify_chain_segment",
|
||||
);
|
||||
|
||||
@@ -3019,12 +3012,10 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
block: Arc<SignedBeaconBlock<T::EthSpec>>,
|
||||
) -> Result<GossipVerifiedBlock<T>, BlockError> {
|
||||
let chain = self.clone();
|
||||
let span = Span::current();
|
||||
self.task_executor
|
||||
.clone()
|
||||
.spawn_blocking_handle(
|
||||
move || {
|
||||
let _guard = span.enter();
|
||||
let slot = block.slot();
|
||||
let graffiti_string = block.message().body().graffiti().as_utf8_lossy();
|
||||
|
||||
@@ -3332,11 +3323,9 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
|
||||
let data_availability_checker = self.data_availability_checker.clone();
|
||||
|
||||
let current_span = Span::current();
|
||||
let result = self
|
||||
.task_executor
|
||||
.spawn_blocking_with_rayon_async(RayonPoolType::HighPriority, move || {
|
||||
let _guard = current_span.enter();
|
||||
data_availability_checker.reconstruct_data_columns(&block_root)
|
||||
})
|
||||
.await
|
||||
@@ -3811,13 +3800,9 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
}
|
||||
|
||||
let block_root = {
|
||||
// Capture the current span before moving into the blocking task
|
||||
let current_span = tracing::Span::current();
|
||||
let chain = self.clone();
|
||||
self.spawn_blocking_handle(
|
||||
move || {
|
||||
// Enter the captured span in the blocking thread
|
||||
let _guard = current_span.enter();
|
||||
chain.import_block(
|
||||
block,
|
||||
block_root,
|
||||
@@ -4528,15 +4513,10 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
//
|
||||
// Load the parent state from disk.
|
||||
let chain = self.clone();
|
||||
let span = Span::current();
|
||||
let (state, state_root_opt) = self
|
||||
.task_executor
|
||||
.spawn_blocking_handle(
|
||||
move || {
|
||||
let _guard =
|
||||
debug_span!(parent: span, "load_state_for_block_production").entered();
|
||||
chain.load_state_for_block_production(slot)
|
||||
},
|
||||
move || chain.load_state_for_block_production(slot),
|
||||
"load_state_for_block_production",
|
||||
)
|
||||
.ok_or(BlockProductionError::ShuttingDown)?
|
||||
@@ -4960,13 +4940,10 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
.graffiti_calculator
|
||||
.get_graffiti(graffiti_settings)
|
||||
.await;
|
||||
let span = Span::current();
|
||||
let mut partial_beacon_block = self
|
||||
.task_executor
|
||||
.spawn_blocking_handle(
|
||||
move || {
|
||||
let _guard =
|
||||
debug_span!(parent: span, "produce_partial_beacon_block").entered();
|
||||
chain.produce_partial_beacon_block(
|
||||
state,
|
||||
state_root_opt,
|
||||
@@ -5002,14 +4979,10 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
match block_contents_type {
|
||||
BlockProposalContentsType::Full(block_contents) => {
|
||||
let chain = self.clone();
|
||||
let span = Span::current();
|
||||
let beacon_block_response = self
|
||||
.task_executor
|
||||
.spawn_blocking_handle(
|
||||
move || {
|
||||
let _guard =
|
||||
debug_span!(parent: span, "complete_partial_beacon_block")
|
||||
.entered();
|
||||
chain.complete_partial_beacon_block(
|
||||
partial_beacon_block,
|
||||
Some(block_contents),
|
||||
@@ -5026,14 +4999,10 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
}
|
||||
BlockProposalContentsType::Blinded(block_contents) => {
|
||||
let chain = self.clone();
|
||||
let span = Span::current();
|
||||
let beacon_block_response = self
|
||||
.task_executor
|
||||
.spawn_blocking_handle(
|
||||
move || {
|
||||
let _guard =
|
||||
debug_span!(parent: span, "complete_partial_beacon_block")
|
||||
.entered();
|
||||
chain.complete_partial_beacon_block(
|
||||
partial_beacon_block,
|
||||
Some(block_contents),
|
||||
@@ -5051,13 +5020,10 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
}
|
||||
} else {
|
||||
let chain = self.clone();
|
||||
let span = Span::current();
|
||||
let beacon_block_response = self
|
||||
.task_executor
|
||||
.spawn_blocking_handle(
|
||||
move || {
|
||||
let _guard =
|
||||
debug_span!(parent: span, "complete_partial_beacon_block").entered();
|
||||
chain.complete_partial_beacon_block(
|
||||
partial_beacon_block,
|
||||
None,
|
||||
@@ -5075,6 +5041,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
#[instrument(skip_all, level = "debug")]
|
||||
fn produce_partial_beacon_block(
|
||||
self: &Arc<Self>,
|
||||
mut state: BeaconState<T::EthSpec>,
|
||||
@@ -5319,6 +5286,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
})
|
||||
}
|
||||
|
||||
#[instrument(skip_all, level = "debug")]
|
||||
fn complete_partial_beacon_block<Payload: AbstractExecPayload<T::EthSpec>>(
|
||||
&self,
|
||||
partial_beacon_block: PartialBeaconBlock<T::EthSpec>,
|
||||
|
||||
@@ -19,7 +19,7 @@ use state_processing::{
|
||||
};
|
||||
use state_processing::{VerifyOperation, state_advance::complete_state_advance};
|
||||
use task_executor::JoinHandle;
|
||||
use tracing::{Instrument, Span, debug, debug_span, error, instrument, trace, warn};
|
||||
use tracing::{Instrument, debug, debug_span, error, instrument, trace, warn};
|
||||
use tree_hash::TreeHash;
|
||||
use types::consts::gloas::BUILDER_INDEX_SELF_BUILD;
|
||||
use types::{
|
||||
@@ -87,15 +87,10 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
//
|
||||
// Load the parent state from disk.
|
||||
let chain = self.clone();
|
||||
let span = Span::current();
|
||||
let (state, state_root_opt) = self
|
||||
.task_executor
|
||||
.spawn_blocking_handle(
|
||||
move || {
|
||||
let _guard =
|
||||
debug_span!(parent: span, "load_state_for_block_production").entered();
|
||||
chain.load_state_for_block_production(slot)
|
||||
},
|
||||
move || chain.load_state_for_block_production(slot),
|
||||
"load_state_for_block_production",
|
||||
)
|
||||
.ok_or(BlockProductionError::ShuttingDown)?
|
||||
@@ -135,13 +130,10 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
.graffiti_calculator
|
||||
.get_graffiti(graffiti_settings)
|
||||
.await;
|
||||
let span = Span::current();
|
||||
let (partial_beacon_block, state) = self
|
||||
.task_executor
|
||||
.spawn_blocking_handle(
|
||||
move || {
|
||||
let _guard =
|
||||
debug_span!(parent: span, "produce_partial_beacon_block_gloas").entered();
|
||||
chain.produce_partial_beacon_block_gloas(
|
||||
state,
|
||||
state_root_opt,
|
||||
@@ -175,12 +167,9 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
//
|
||||
// Complete the block with the execution payload bid.
|
||||
let chain = self.clone();
|
||||
let span = Span::current();
|
||||
self.task_executor
|
||||
.spawn_blocking_handle(
|
||||
move || {
|
||||
let _guard =
|
||||
debug_span!(parent: span, "complete_partial_beacon_block_gloas").entered();
|
||||
chain.complete_partial_beacon_block_gloas(
|
||||
partial_beacon_block,
|
||||
execution_payload_bid,
|
||||
@@ -198,6 +187,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
#[allow(clippy::type_complexity)]
|
||||
#[instrument(skip_all, level = "debug")]
|
||||
fn produce_partial_beacon_block_gloas(
|
||||
self: &Arc<Self>,
|
||||
mut state: BeaconState<T::EthSpec>,
|
||||
@@ -432,6 +422,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
/// - `pending_state` is the state post block application (prior to payload application)
|
||||
/// - `block_value` is the consensus-layer rewards for `block`
|
||||
#[allow(clippy::type_complexity)]
|
||||
#[instrument(skip_all, level = "debug")]
|
||||
fn complete_partial_beacon_block_gloas(
|
||||
&self,
|
||||
partial_beacon_block: PartialBeaconBlock<T::EthSpec>,
|
||||
|
||||
@@ -15,6 +15,7 @@ mod gloas;
|
||||
impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
/// Load a beacon state from the database for block production. This is a long-running process
|
||||
/// that should not be performed in an `async` context.
|
||||
#[instrument(skip_all, level = "debug")]
|
||||
pub(crate) fn load_state_for_block_production(
|
||||
self: &Arc<Self>,
|
||||
slot: Slot,
|
||||
|
||||
@@ -58,7 +58,6 @@ use store::{
|
||||
Error as StoreError, KeyValueStore, KeyValueStoreOp, StoreConfig, iter::StateRootsIterator,
|
||||
};
|
||||
use task_executor::{JoinHandle, ShutdownReason};
|
||||
use tracing::info_span;
|
||||
use tracing::{debug, error, info, instrument, warn};
|
||||
use types::*;
|
||||
|
||||
@@ -528,22 +527,15 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
/// such a case it's critical that the `BeaconChain` keeps importing blocks so that the
|
||||
/// situation can be rectified. We avoid returning an error here so that calling functions
|
||||
/// can't abort block import because an error is returned here.
|
||||
#[instrument(name = "lh_recompute_head_at_slot", skip(self), level = "info", fields(slot = %current_slot))]
|
||||
pub async fn recompute_head_at_slot(self: &Arc<Self>, current_slot: Slot) {
|
||||
let span = info_span!(
|
||||
"lh_recompute_head_at_slot",
|
||||
slot = %current_slot
|
||||
);
|
||||
|
||||
metrics::inc_counter(&metrics::FORK_CHOICE_REQUESTS);
|
||||
let _timer = metrics::start_timer(&metrics::FORK_CHOICE_TIMES);
|
||||
|
||||
let chain = self.clone();
|
||||
match self
|
||||
.spawn_blocking_handle(
|
||||
move || {
|
||||
let _guard = span.enter();
|
||||
chain.recompute_head_at_slot_internal(current_slot)
|
||||
},
|
||||
move || chain.recompute_head_at_slot_internal(current_slot),
|
||||
"recompute_head_internal",
|
||||
)
|
||||
.await
|
||||
|
||||
@@ -32,7 +32,7 @@ use mockall_double::double;
|
||||
use ssz_types::FixedVector;
|
||||
use state_processing::per_block_processing::deneb::kzg_commitment_to_versioned_hash;
|
||||
use std::sync::Arc;
|
||||
use tracing::{Span, debug, instrument, warn};
|
||||
use tracing::{debug, instrument, warn};
|
||||
use types::data::{BlobSidecarError, DataColumnSidecarError};
|
||||
use types::{
|
||||
BeaconStateError, Blob, BlobSidecar, ColumnIndex, EthSpec, FullPayload, Hash256, KzgProofs,
|
||||
@@ -356,12 +356,10 @@ async fn compute_custody_columns_to_import<T: BeaconChainTypes>(
|
||||
let spec = chain_adapter.spec().clone();
|
||||
let chain_adapter_cloned = chain_adapter.clone();
|
||||
let custody_columns_indices = custody_columns_indices.to_vec();
|
||||
let current_span = Span::current();
|
||||
chain_adapter
|
||||
.executor()
|
||||
.spawn_blocking_handle(
|
||||
move || {
|
||||
let _guard = current_span.enter();
|
||||
let mut timer = metrics::start_timer_vec(
|
||||
&metrics::DATA_COLUMN_SIDECAR_COMPUTATION,
|
||||
&[&blobs.len().to_string()],
|
||||
|
||||
@@ -4,7 +4,7 @@ use educe::Educe;
|
||||
use eth2::types::{EventKind, SseExecutionPayloadGossip};
|
||||
use parking_lot::{Mutex, RwLock};
|
||||
use store::DatabaseBlock;
|
||||
use tracing::{Span, debug};
|
||||
use tracing::debug;
|
||||
use types::{
|
||||
ChainSpec, EthSpec, ExecutionPayloadBid, ExecutionPayloadEnvelope, Hash256, SignedBeaconBlock,
|
||||
SignedExecutionPayloadEnvelope, Slot, consts::gloas::BUILDER_INDEX_SELF_BUILD,
|
||||
@@ -270,12 +270,10 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
envelope: Arc<SignedExecutionPayloadEnvelope<T::EthSpec>>,
|
||||
) -> Result<GossipVerifiedEnvelope<T>, EnvelopeError> {
|
||||
let chain = self.clone();
|
||||
let span = Span::current();
|
||||
self.task_executor
|
||||
.clone()
|
||||
.spawn_blocking_handle(
|
||||
move || {
|
||||
let _guard = span.enter();
|
||||
let slot = envelope.slot();
|
||||
let beacon_block_root = envelope.message.beacon_block_root;
|
||||
|
||||
|
||||
@@ -192,13 +192,9 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
} = import_data;
|
||||
|
||||
let block_root = {
|
||||
// Capture the current span before moving into the blocking task
|
||||
let current_span = tracing::Span::current();
|
||||
let chain = self.clone();
|
||||
self.spawn_blocking_handle(
|
||||
move || {
|
||||
// Enter the captured span in the blocking thread
|
||||
let _guard = current_span.enter();
|
||||
chain.import_execution_payload_envelope(
|
||||
envelope,
|
||||
block_root,
|
||||
|
||||
Reference in New Issue
Block a user