Instrument tracing spans for block processing and import (#7816)

#7815

- removes all existing spans, so some span fields that appear in logs like `service_name` may be lost.
- instruments a few key code paths in the beacon node, starting from **root spans** named below:

* Gossip block and blobs
* `process_gossip_data_column_sidecar`
* `process_gossip_blob`
* `process_gossip_block`
* Rpc block and blobs
* `process_rpc_block`
* `process_rpc_blobs`
* `process_rpc_custody_columns`
* Rpc blocks (range and backfill)
* `process_chain_segment`
* `PendingComponents` lifecycle
* `pending_components`

To test locally:
* Run Grafana and Tempo with https://github.com/sigp/lighthouse-metrics/pull/57
* Run Lighthouse BN with `--telemetry-collector-url http://localhost:4317`

Some captured traces can be found here: https://hackmd.io/@jimmygchen/r1sLOxPPeg

Removing the old spans seem to have reduced the memory usage quite a lot - i think we were using them on long running tasks and too excessively:
<img width="910" height="495" alt="image" src="https://github.com/user-attachments/assets/5208bbe4-53b2-4ead-bc71-0b782c788669" />
This commit is contained in:
Jimmy Chen
2025-08-08 15:32:22 +10:00
committed by GitHub
parent 6dfab22267
commit 40c2fd5ff4
52 changed files with 633 additions and 1164 deletions

View File

@@ -126,7 +126,7 @@ use store::{
};
use task_executor::{ShutdownReason, TaskExecutor};
use tokio_stream::Stream;
use tracing::{debug, error, info, trace, warn};
use tracing::{debug, debug_span, error, info, info_span, instrument, trace, warn, Span};
use tree_hash::TreeHash;
use types::blob_sidecar::FixedBlobSidecarList;
use types::data_column_sidecar::ColumnIndex;
@@ -2203,6 +2203,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
})
}
#[instrument(skip_all, level = "trace")]
pub fn verify_data_column_sidecar_for_gossip(
self: &Arc<Self>,
data_column_sidecar: Arc<DataColumnSidecar<T::EthSpec>>,
@@ -2215,6 +2216,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
})
}
#[instrument(skip_all, level = "trace")]
pub fn verify_blob_sidecar_for_gossip(
self: &Arc<Self>,
blob_sidecar: Arc<BlobSidecar<T::EthSpec>>,
@@ -2851,8 +2853,12 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
// Filter uninteresting blocks from the chain segment in a blocking task.
let chain = self.clone();
let filter_chain_segment = debug_span!("filter_chain_segment");
let filtered_chain_segment_future = self.spawn_blocking_handle(
move || chain.filter_chain_segment(chain_segment),
move || {
let _guard = filter_chain_segment.enter();
chain.filter_chain_segment(chain_segment)
},
"filter_chain_segment",
);
let mut filtered_chain_segment = match filtered_chain_segment_future.await {
@@ -2883,8 +2889,12 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
std::mem::swap(&mut blocks, &mut filtered_chain_segment);
let chain = self.clone();
let current_span = Span::current();
let signature_verification_future = self.spawn_blocking_handle(
move || signature_verify_chain_segment(blocks, &chain),
move || {
let _guard = current_span.enter();
signature_verify_chain_segment(blocks, &chain)
},
"signature_verify_chain_segment",
);
@@ -2974,10 +2984,12 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
block: Arc<SignedBeaconBlock<T::EthSpec>>,
) -> Result<GossipVerifiedBlock<T>, BlockError> {
let chain = self.clone();
let span = Span::current();
self.task_executor
.clone()
.spawn_blocking_handle(
move || {
let _guard = span.enter();
let slot = block.slot();
let graffiti_string = block.message().body().graffiti().as_utf8_lossy();
@@ -3006,7 +3018,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}
}
},
"payload_verification_handle",
"gossip_block_verification_handle",
)
.ok_or(BeaconChainError::RuntimeShutdown)?
.await
@@ -3015,6 +3027,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
/// Cache the blob in the processing cache, process it, then evict it from the cache if it was
/// imported or errors.
#[instrument(skip_all, level = "debug")]
pub async fn process_gossip_blob(
self: &Arc<Self>,
blob: GossipVerifiedBlob<T>,
@@ -3088,6 +3101,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
/// Cache the blobs in the processing cache, process it, then evict it from the cache if it was
/// imported or errors.
#[instrument(skip_all, level = "debug")]
pub async fn process_rpc_blobs(
self: &Arc<Self>,
slot: Slot,
@@ -3383,6 +3397,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
///
/// Returns an `Err` if the given block was invalid, or an error was encountered during
/// verification.
#[instrument(skip_all, fields(block_root = ?block_root, block_source = %block_source))]
pub async fn process_block<B: IntoExecutionPendingBlock<T>>(
self: &Arc<Self>,
block_root: Hash256,
@@ -3499,6 +3514,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
/// get a fully `ExecutedBlock`.
///
/// An error is returned if the verification handle couldn't be awaited.
#[instrument(skip_all, level = "debug")]
pub async fn into_executed_block(
self: Arc<Self>,
execution_pending_block: ExecutionPendingBlock<T>,
@@ -3547,6 +3563,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
/// Checks if the block is available, and imports immediately if so, otherwise caches the block
/// in the data availability checker.
#[instrument(skip_all)]
async fn check_block_availability_and_import(
self: &Arc<Self>,
block: AvailabilityPendingExecutedBlock<T::EthSpec>,
@@ -3747,6 +3764,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}
}
#[instrument(skip_all)]
pub async fn import_available_block(
self: &Arc<Self>,
block: Box<AvailableExecutedBlock<T::EthSpec>>,
@@ -3775,11 +3793,14 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
// TODO(das) record custody column available timestamp
// import
let chain = self.clone();
let block_root = self
.spawn_blocking_handle(
let block_root = {
// Capture the current span before moving into the blocking task
let current_span = tracing::Span::current();
let chain = self.clone();
self.spawn_blocking_handle(
move || {
// Enter the captured span in the blocking thread
let _guard = current_span.enter();
chain.import_block(
block,
block_root,
@@ -3791,7 +3812,8 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
},
"payload_verification_handle",
)
.await??;
.await??
};
// Remove block components from da_checker AFTER completing block import. Then we can assert
// the following invariant:
@@ -3815,6 +3837,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
/// An error is returned if the block was unable to be imported. It may be partially imported
/// (i.e., this function is not atomic).
#[allow(clippy::too_many_arguments)]
#[instrument(skip_all)]
fn import_block(
&self,
signed_block: AvailableBlock<T::EthSpec>,
@@ -3852,6 +3875,13 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
// Only take a write lock if there are new keys to import.
if state.validators().len() > pubkey_cache.len() {
let _pubkey_span = debug_span!(
"pubkey_cache_update",
new_validators = tracing::field::Empty,
cache_len_before = pubkey_cache.len()
)
.entered();
parking_lot::RwLockUpgradableReadGuard::upgrade(pubkey_cache)
.import_new_pubkeys(&state)?
} else {
@@ -3865,6 +3895,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
// However, latency between the VC and the BN might cause the VC to produce attestations at
// a previous slot.
if state.current_epoch().saturating_add(1_u64) >= current_epoch {
let _attester_span = debug_span!("attester_cache_update").entered();
self.attester_cache
.maybe_cache_state(&state, block_root, &self.spec)
.map_err(BeaconChainError::from)?;
@@ -4009,6 +4040,8 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
ops.push(StoreOp::PutBlock(block_root, signed_block.clone()));
ops.push(StoreOp::PutState(block.state_root(), &state));
let db_span = info_span!("persist_blocks_and_blobs").entered();
if let Err(e) = self.store.do_atomically_with_block_and_blobs_cache(ops) {
error!(
msg = "Restoring fork choice from disk",
@@ -4021,6 +4054,8 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.unwrap_or(e.into()));
}
drop(db_span);
// The fork choice write-lock is dropped *after* the on-disk database has been updated.
// This prevents inconsistency between the two at the expense of concurrency.
drop(fork_choice);
@@ -4155,6 +4190,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}
/// Process a block for the validator monitor, including all its constituent messages.
#[instrument(skip_all, level = "debug")]
fn import_block_update_validator_monitor(
&self,
block: BeaconBlockRef<T::EthSpec>,
@@ -4249,6 +4285,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
/// Iterate through the attestations in the block and register them as "observed".
///
/// This will stop us from propagating them on the gossip network.
#[instrument(skip_all, level = "debug")]
fn import_block_observe_attestations(
&self,
block: BeaconBlockRef<T::EthSpec>,
@@ -4311,6 +4348,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}
/// If a slasher is configured, provide the attestations from the block.
#[instrument(skip_all, level = "debug")]
fn import_block_update_slasher(
&self,
block: BeaconBlockRef<T::EthSpec>,
@@ -4409,6 +4447,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
// For the current and next epoch of this state, ensure we have the shuffling from this
// block in our cache.
#[instrument(skip_all, level = "debug")]
fn import_block_update_shuffling_cache(
&self,
block_root: Hash256,