Implement custom OpenTelemetry sampler to filter uninstrumented traces (#8647)

Co-Authored-By: Jimmy Chen <jchen.tc@gmail.com>
This commit is contained in:
Jimmy Chen
2026-01-22 16:11:26 +11:00
committed by GitHub
parent 21cabba1a2
commit 7f065009a7
24 changed files with 139 additions and 157 deletions

17
Cargo.lock generated
View File

@@ -1234,7 +1234,6 @@ dependencies = [
"int_to_bytes", "int_to_bytes",
"itertools 0.10.5", "itertools 0.10.5",
"kzg", "kzg",
"lighthouse_tracing",
"lighthouse_version", "lighthouse_version",
"logging", "logging",
"lru 0.12.5", "lru 0.12.5",
@@ -4224,7 +4223,6 @@ dependencies = [
"health_metrics", "health_metrics",
"hex", "hex",
"lighthouse_network", "lighthouse_network",
"lighthouse_tracing",
"lighthouse_version", "lighthouse_version",
"logging", "logging",
"lru 0.12.5", "lru 0.12.5",
@@ -5406,7 +5404,6 @@ dependencies = [
"futures", "futures",
"initialized_validators", "initialized_validators",
"lighthouse_network", "lighthouse_network",
"lighthouse_tracing",
"lighthouse_version", "lighthouse_version",
"logging", "logging",
"malloc_utils", "malloc_utils",
@@ -5427,6 +5424,7 @@ dependencies = [
"tracing", "tracing",
"tracing-opentelemetry", "tracing-opentelemetry",
"tracing-subscriber", "tracing-subscriber",
"tracing_samplers",
"types", "types",
"validator_client", "validator_client",
"validator_dir", "validator_dir",
@@ -5488,10 +5486,6 @@ dependencies = [
"unsigned-varint", "unsigned-varint",
] ]
[[package]]
name = "lighthouse_tracing"
version = "0.1.0"
[[package]] [[package]]
name = "lighthouse_validator_store" name = "lighthouse_validator_store"
version = "0.1.0" version = "0.1.0"
@@ -6175,7 +6169,6 @@ dependencies = [
"kzg", "kzg",
"libp2p", "libp2p",
"lighthouse_network", "lighthouse_network",
"lighthouse_tracing",
"logging", "logging",
"lru_cache", "lru_cache",
"matches", "matches",
@@ -9356,6 +9349,14 @@ dependencies = [
"tracing-serde", "tracing-serde",
] ]
[[package]]
name = "tracing_samplers"
version = "0.1.0"
dependencies = [
"opentelemetry",
"opentelemetry_sdk",
]
[[package]] [[package]]
name = "tree_hash" name = "tree_hash"
version = "0.12.0" version = "0.12.0"

View File

@@ -11,7 +11,6 @@ members = [
"beacon_node/http_api", "beacon_node/http_api",
"beacon_node/http_metrics", "beacon_node/http_metrics",
"beacon_node/lighthouse_network", "beacon_node/lighthouse_network",
"beacon_node/lighthouse_tracing",
"beacon_node/network", "beacon_node/network",
"beacon_node/operation_pool", "beacon_node/operation_pool",
"beacon_node/store", "beacon_node/store",
@@ -44,6 +43,7 @@ members = [
"common/target_check", "common/target_check",
"common/task_executor", "common/task_executor",
"common/test_random_derive", "common/test_random_derive",
"common/tracing_samplers",
"common/validator_dir", "common/validator_dir",
"common/warp_utils", "common/warp_utils",
"common/workspace_members", "common/workspace_members",
@@ -182,7 +182,6 @@ libp2p = { git = "https://github.com/libp2p/rust-libp2p.git", default-features =
] } ] }
libsecp256k1 = "0.7" libsecp256k1 = "0.7"
lighthouse_network = { path = "beacon_node/lighthouse_network" } lighthouse_network = { path = "beacon_node/lighthouse_network" }
lighthouse_tracing = { path = "beacon_node/lighthouse_tracing" }
lighthouse_validator_store = { path = "validator_client/lighthouse_validator_store" } lighthouse_validator_store = { path = "validator_client/lighthouse_validator_store" }
lighthouse_version = { path = "common/lighthouse_version" } lighthouse_version = { path = "common/lighthouse_version" }
lockfile = { path = "common/lockfile" } lockfile = { path = "common/lockfile" }
@@ -269,6 +268,7 @@ tracing-core = "0.1"
tracing-log = "0.2" tracing-log = "0.2"
tracing-opentelemetry = "0.31.0" tracing-opentelemetry = "0.31.0"
tracing-subscriber = { version = "0.3", features = ["env-filter", "json"] } tracing-subscriber = { version = "0.3", features = ["env-filter", "json"] }
tracing_samplers = { path = "common/tracing_samplers" }
tree_hash = "0.12.0" tree_hash = "0.12.0"
tree_hash_derive = "0.12.0" tree_hash_derive = "0.12.0"
typenum = "1" typenum = "1"

View File

@@ -34,7 +34,6 @@ hex = { workspace = true }
int_to_bytes = { workspace = true } int_to_bytes = { workspace = true }
itertools = { workspace = true } itertools = { workspace = true }
kzg = { workspace = true } kzg = { workspace = true }
lighthouse_tracing = { workspace = true }
lighthouse_version = { workspace = true } lighthouse_version = { workspace = true }
logging = { workspace = true } logging = { workspace = true }
lru = { workspace = true } lru = { workspace = true }

View File

@@ -91,7 +91,6 @@ use futures::channel::mpsc::Sender;
use itertools::Itertools; use itertools::Itertools;
use itertools::process_results; use itertools::process_results;
use kzg::Kzg; use kzg::Kzg;
use lighthouse_tracing::SPAN_PRODUCE_UNAGGREGATED_ATTESTATION;
use logging::crit; use logging::crit;
use operation_pool::{ use operation_pool::{
CompactAttestationRef, OperationPool, PersistedOperationPool, ReceivedPreCapella, CompactAttestationRef, OperationPool, PersistedOperationPool, ReceivedPreCapella,
@@ -1843,7 +1842,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
/// ## Errors /// ## Errors
/// ///
/// May return an error if the `request_slot` is too far behind the head state. /// May return an error if the `request_slot` is too far behind the head state.
#[instrument(name = SPAN_PRODUCE_UNAGGREGATED_ATTESTATION, skip_all, fields(%request_slot, %request_index), level = "debug")] #[instrument(name = "lh_produce_unaggregated_attestation", skip_all, fields(%request_slot, %request_index), level = "debug")]
pub fn produce_unaggregated_attestation( pub fn produce_unaggregated_attestation(
&self, &self,
request_slot: Slot, request_slot: Slot,

View File

@@ -47,7 +47,7 @@ use fork_choice::{
ResetPayloadStatuses, ResetPayloadStatuses,
}; };
use itertools::process_results; use itertools::process_results;
use lighthouse_tracing::SPAN_RECOMPUTE_HEAD;
use logging::crit; use logging::crit;
use parking_lot::{Mutex, RwLock, RwLockReadGuard, RwLockUpgradableReadGuard, RwLockWriteGuard}; use parking_lot::{Mutex, RwLock, RwLockReadGuard, RwLockUpgradableReadGuard, RwLockWriteGuard};
use slot_clock::SlotClock; use slot_clock::SlotClock;
@@ -514,7 +514,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
/// can't abort block import because an error is returned here. /// can't abort block import because an error is returned here.
pub async fn recompute_head_at_slot(self: &Arc<Self>, current_slot: Slot) { pub async fn recompute_head_at_slot(self: &Arc<Self>, current_slot: Slot) {
let span = info_span!( let span = info_span!(
SPAN_RECOMPUTE_HEAD, "lh_recompute_head_at_slot",
slot = %current_slot slot = %current_slot
); );

View File

@@ -9,7 +9,6 @@ use crate::block_verification_types::{
use crate::data_availability_checker::{Availability, AvailabilityCheckError}; use crate::data_availability_checker::{Availability, AvailabilityCheckError};
use crate::data_column_verification::KzgVerifiedCustodyDataColumn; use crate::data_column_verification::KzgVerifiedCustodyDataColumn;
use crate::{BeaconChainTypes, BlockProcessStatus}; use crate::{BeaconChainTypes, BlockProcessStatus};
use lighthouse_tracing::SPAN_PENDING_COMPONENTS;
use lru::LruCache; use lru::LruCache;
use parking_lot::{MappedRwLockReadGuard, RwLock, RwLockReadGuard, RwLockWriteGuard}; use parking_lot::{MappedRwLockReadGuard, RwLock, RwLockReadGuard, RwLockWriteGuard};
use ssz_types::{RuntimeFixedVector, RuntimeVariableList}; use ssz_types::{RuntimeFixedVector, RuntimeVariableList};
@@ -334,7 +333,7 @@ impl<E: EthSpec> PendingComponents<E> {
/// Returns an empty `PendingComponents` object with the given block root. /// Returns an empty `PendingComponents` object with the given block root.
pub fn empty(block_root: Hash256, max_len: usize) -> Self { pub fn empty(block_root: Hash256, max_len: usize) -> Self {
let span = debug_span!(parent: None, SPAN_PENDING_COMPONENTS, %block_root); let span = debug_span!(parent: None, "lh_pending_components", %block_root);
let _guard = span.clone().entered(); let _guard = span.clone().entered();
Self { Self {
block_root, block_root,

View File

@@ -23,7 +23,6 @@ futures = { workspace = true }
health_metrics = { workspace = true } health_metrics = { workspace = true }
hex = { workspace = true } hex = { workspace = true }
lighthouse_network = { workspace = true } lighthouse_network = { workspace = true }
lighthouse_tracing = { workspace = true }
lighthouse_version = { workspace = true } lighthouse_version = { workspace = true }
logging = { workspace = true } logging = { workspace = true }
lru = { workspace = true } lru = { workspace = true }

View File

@@ -12,7 +12,6 @@ use beacon_chain::{
}; };
use eth2::beacon_response::ForkVersionedResponse; use eth2::beacon_response::ForkVersionedResponse;
use eth2::types::{self as api_types, ProduceBlockV3Metadata, SkipRandaoVerification}; use eth2::types::{self as api_types, ProduceBlockV3Metadata, SkipRandaoVerification};
use lighthouse_tracing::{SPAN_PRODUCE_BLOCK_V2, SPAN_PRODUCE_BLOCK_V3};
use ssz::Encode; use ssz::Encode;
use std::sync::Arc; use std::sync::Arc;
use tracing::instrument; use tracing::instrument;
@@ -45,7 +44,7 @@ pub fn get_randao_verification(
} }
#[instrument( #[instrument(
name = SPAN_PRODUCE_BLOCK_V3, name = "lh_produce_block_v3",
skip_all, skip_all,
fields(%slot) fields(%slot)
)] )]
@@ -169,7 +168,7 @@ pub async fn produce_blinded_block_v2<T: BeaconChainTypes>(
} }
#[instrument( #[instrument(
name = SPAN_PRODUCE_BLOCK_V2, name = "lh_produce_block_v2",
skip_all, skip_all,
fields(%slot) fields(%slot)
)] )]

View File

@@ -19,7 +19,6 @@ use eth2::{
use execution_layer::{ProvenancedPayload, SubmitBlindedBlockResponse}; use execution_layer::{ProvenancedPayload, SubmitBlindedBlockResponse};
use futures::TryFutureExt; use futures::TryFutureExt;
use lighthouse_network::PubsubMessage; use lighthouse_network::PubsubMessage;
use lighthouse_tracing::SPAN_PUBLISH_BLOCK;
use network::NetworkMessage; use network::NetworkMessage;
use rand::prelude::SliceRandom; use rand::prelude::SliceRandom;
use slot_clock::SlotClock; use slot_clock::SlotClock;
@@ -79,7 +78,7 @@ impl<T: BeaconChainTypes> ProvenancedBlock<T, Arc<SignedBeaconBlock<T::EthSpec>>
/// Handles a request from the HTTP API for full blocks. /// Handles a request from the HTTP API for full blocks.
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
#[instrument( #[instrument(
name = SPAN_PUBLISH_BLOCK, name = "lh_publish_block",
level = "info", level = "info",
skip_all, skip_all,
fields(block_root = field::Empty, ?validation_level, block_slot = field::Empty, provenance = field::Empty) fields(block_root = field::Empty, ?validation_level, block_slot = field::Empty, provenance = field::Empty)

View File

@@ -1,4 +0,0 @@
[package]
name = "lighthouse_tracing"
version = "0.1.0"
edition = { workspace = true }

View File

@@ -1,83 +0,0 @@
//! This module contains root span identifiers for key code paths in the beacon node.
//!
//! TODO: These span identifiers will be used to implement selective tracing export (to be implemented),
//! where only the listed root spans and their descendants will be exported to the tracing backend.
/// Root span names for block production and publishing
pub const SPAN_PRODUCE_BLOCK_V2: &str = "produce_block_v2";
pub const SPAN_PRODUCE_BLOCK_V3: &str = "produce_block_v3";
pub const SPAN_PUBLISH_BLOCK: &str = "publish_block";
/// Root span names for attestation production
pub const SPAN_PRODUCE_UNAGGREGATED_ATTESTATION: &str = "produce_unaggregated_attestation";
/// Data Availability checker span identifiers
pub const SPAN_PENDING_COMPONENTS: &str = "pending_components";
/// Gossip methods root spans
pub const SPAN_PROCESS_GOSSIP_DATA_COLUMN: &str = "process_gossip_data_column";
pub const SPAN_PROCESS_GOSSIP_BLOB: &str = "process_gossip_blob";
pub const SPAN_PROCESS_GOSSIP_BLOCK: &str = "process_gossip_block";
/// Sync methods root spans
pub const SPAN_SYNCING_CHAIN: &str = "syncing_chain";
pub const SPAN_OUTGOING_RANGE_REQUEST: &str = "outgoing_range_request";
pub const SPAN_SINGLE_BLOCK_LOOKUP: &str = "single_block_lookup";
pub const SPAN_OUTGOING_BLOCK_BY_ROOT_REQUEST: &str = "outgoing_block_by_root_request";
pub const SPAN_OUTGOING_CUSTODY_REQUEST: &str = "outgoing_custody_request";
pub const SPAN_PROCESS_RPC_BLOCK: &str = "process_rpc_block";
pub const SPAN_PROCESS_RPC_BLOBS: &str = "process_rpc_blobs";
pub const SPAN_PROCESS_RPC_CUSTODY_COLUMNS: &str = "process_rpc_custody_columns";
pub const SPAN_PROCESS_CHAIN_SEGMENT: &str = "process_chain_segment";
pub const SPAN_CUSTODY_BACKFILL_SYNC_BATCH_REQUEST: &str = "custody_backfill_sync_batch_request";
pub const SPAN_PROCESS_CHAIN_SEGMENT_BACKFILL: &str = "process_chain_segment_backfill";
pub const SPAN_CUSTODY_BACKFILL_SYNC_IMPORT_COLUMNS: &str = "custody_backfill_sync_import_columns";
/// Fork choice root spans
pub const SPAN_RECOMPUTE_HEAD: &str = "recompute_head_at_slot";
/// RPC methods root spans
pub const SPAN_HANDLE_BLOCKS_BY_RANGE_REQUEST: &str = "handle_blocks_by_range_request";
pub const SPAN_HANDLE_BLOBS_BY_RANGE_REQUEST: &str = "handle_blobs_by_range_request";
pub const SPAN_HANDLE_DATA_COLUMNS_BY_RANGE_REQUEST: &str = "handle_data_columns_by_range_request";
pub const SPAN_HANDLE_BLOCKS_BY_ROOT_REQUEST: &str = "handle_blocks_by_root_request";
pub const SPAN_HANDLE_BLOBS_BY_ROOT_REQUEST: &str = "handle_blobs_by_root_request";
pub const SPAN_HANDLE_DATA_COLUMNS_BY_ROOT_REQUEST: &str = "handle_data_columns_by_root_request";
pub const SPAN_HANDLE_LIGHT_CLIENT_UPDATES_BY_RANGE: &str = "handle_light_client_updates_by_range";
pub const SPAN_HANDLE_LIGHT_CLIENT_BOOTSTRAP: &str = "handle_light_client_bootstrap";
pub const SPAN_HANDLE_LIGHT_CLIENT_OPTIMISTIC_UPDATE: &str =
"handle_light_client_optimistic_update";
pub const SPAN_HANDLE_LIGHT_CLIENT_FINALITY_UPDATE: &str = "handle_light_client_finality_update";
/// List of all root span names that are allowed to be exported to the tracing backend.
/// Only these spans and their descendants will be processed to reduce noise from
/// uninstrumented code paths. New root spans must be added to this list to be traced.
pub const LH_BN_ROOT_SPAN_NAMES: &[&str] = &[
SPAN_PRODUCE_BLOCK_V2,
SPAN_PRODUCE_BLOCK_V3,
SPAN_PUBLISH_BLOCK,
SPAN_PENDING_COMPONENTS,
SPAN_PROCESS_GOSSIP_DATA_COLUMN,
SPAN_PROCESS_GOSSIP_BLOB,
SPAN_PROCESS_GOSSIP_BLOCK,
SPAN_SYNCING_CHAIN,
SPAN_OUTGOING_RANGE_REQUEST,
SPAN_SINGLE_BLOCK_LOOKUP,
SPAN_PROCESS_RPC_BLOCK,
SPAN_PROCESS_RPC_BLOBS,
SPAN_PROCESS_RPC_CUSTODY_COLUMNS,
SPAN_PROCESS_CHAIN_SEGMENT,
SPAN_PROCESS_CHAIN_SEGMENT_BACKFILL,
SPAN_HANDLE_BLOCKS_BY_RANGE_REQUEST,
SPAN_HANDLE_BLOBS_BY_RANGE_REQUEST,
SPAN_HANDLE_DATA_COLUMNS_BY_RANGE_REQUEST,
SPAN_HANDLE_BLOCKS_BY_ROOT_REQUEST,
SPAN_HANDLE_BLOBS_BY_ROOT_REQUEST,
SPAN_HANDLE_DATA_COLUMNS_BY_ROOT_REQUEST,
SPAN_HANDLE_LIGHT_CLIENT_UPDATES_BY_RANGE,
SPAN_HANDLE_LIGHT_CLIENT_BOOTSTRAP,
SPAN_HANDLE_LIGHT_CLIENT_OPTIMISTIC_UPDATE,
SPAN_HANDLE_LIGHT_CLIENT_FINALITY_UPDATE,
SPAN_CUSTODY_BACKFILL_SYNC_BATCH_REQUEST,
SPAN_CUSTODY_BACKFILL_SYNC_IMPORT_COLUMNS,
];

View File

@@ -29,7 +29,6 @@ hex = { workspace = true }
igd-next = { version = "0.16", features = ["aio_tokio"] } igd-next = { version = "0.16", features = ["aio_tokio"] }
itertools = { workspace = true } itertools = { workspace = true }
lighthouse_network = { workspace = true } lighthouse_network = { workspace = true }
lighthouse_tracing = { workspace = true }
logging = { workspace = true } logging = { workspace = true }
lru_cache = { workspace = true } lru_cache = { workspace = true }
metrics = { workspace = true } metrics = { workspace = true }

View File

@@ -21,9 +21,6 @@ use beacon_chain::{
}; };
use beacon_processor::{Work, WorkEvent}; use beacon_processor::{Work, WorkEvent};
use lighthouse_network::{Client, MessageAcceptance, MessageId, PeerAction, PeerId, ReportSource}; use lighthouse_network::{Client, MessageAcceptance, MessageId, PeerAction, PeerId, ReportSource};
use lighthouse_tracing::{
SPAN_PROCESS_GOSSIP_BLOB, SPAN_PROCESS_GOSSIP_BLOCK, SPAN_PROCESS_GOSSIP_DATA_COLUMN,
};
use logging::crit; use logging::crit;
use operation_pool::ReceivedPreCapella; use operation_pool::ReceivedPreCapella;
use slot_clock::SlotClock; use slot_clock::SlotClock;
@@ -605,7 +602,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
} }
#[instrument( #[instrument(
name = SPAN_PROCESS_GOSSIP_DATA_COLUMN, name = "lh_process_gossip_data_column",
parent = None, parent = None,
level = "debug", level = "debug",
skip_all, skip_all,
@@ -769,7 +766,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
#[instrument( #[instrument(
name = SPAN_PROCESS_GOSSIP_BLOB, name = "lh_process_gossip_blob",
parent = None, parent = None,
level = "debug", level = "debug",
skip_all, skip_all,
@@ -1135,7 +1132,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
/// Raises a log if there are errors. /// Raises a log if there are errors.
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
#[instrument( #[instrument(
name = SPAN_PROCESS_GOSSIP_BLOCK, name = "lh_process_gossip_block",
parent = None, parent = None,
level = "debug", level = "debug",
skip_all, skip_all,

View File

@@ -10,13 +10,6 @@ use lighthouse_network::rpc::methods::{
}; };
use lighthouse_network::rpc::*; use lighthouse_network::rpc::*;
use lighthouse_network::{PeerId, ReportSource, Response, SyncInfo}; use lighthouse_network::{PeerId, ReportSource, Response, SyncInfo};
use lighthouse_tracing::{
SPAN_HANDLE_BLOBS_BY_RANGE_REQUEST, SPAN_HANDLE_BLOBS_BY_ROOT_REQUEST,
SPAN_HANDLE_BLOCKS_BY_RANGE_REQUEST, SPAN_HANDLE_BLOCKS_BY_ROOT_REQUEST,
SPAN_HANDLE_DATA_COLUMNS_BY_RANGE_REQUEST, SPAN_HANDLE_DATA_COLUMNS_BY_ROOT_REQUEST,
SPAN_HANDLE_LIGHT_CLIENT_BOOTSTRAP, SPAN_HANDLE_LIGHT_CLIENT_FINALITY_UPDATE,
SPAN_HANDLE_LIGHT_CLIENT_OPTIMISTIC_UPDATE, SPAN_HANDLE_LIGHT_CLIENT_UPDATES_BY_RANGE,
};
use methods::LightClientUpdatesByRangeRequest; use methods::LightClientUpdatesByRangeRequest;
use slot_clock::SlotClock; use slot_clock::SlotClock;
use std::collections::{HashMap, HashSet, hash_map::Entry}; use std::collections::{HashMap, HashSet, hash_map::Entry};
@@ -163,7 +156,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
/// Handle a `BlocksByRoot` request from the peer. /// Handle a `BlocksByRoot` request from the peer.
#[instrument( #[instrument(
name = SPAN_HANDLE_BLOCKS_BY_ROOT_REQUEST, name = "lh_handle_blocks_by_root_request",
parent = None, parent = None,
level = "debug", level = "debug",
skip_all, skip_all,
@@ -263,7 +256,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
/// Handle a `BlobsByRoot` request from the peer. /// Handle a `BlobsByRoot` request from the peer.
#[instrument( #[instrument(
name = SPAN_HANDLE_BLOBS_BY_ROOT_REQUEST, name = "lh_handle_blobs_by_root_request",
parent = None, parent = None,
level = "debug", level = "debug",
skip_all, skip_all,
@@ -392,7 +385,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
/// Handle a `DataColumnsByRoot` request from the peer. /// Handle a `DataColumnsByRoot` request from the peer.
#[instrument( #[instrument(
name = SPAN_HANDLE_DATA_COLUMNS_BY_ROOT_REQUEST, name = "lh_handle_data_columns_by_root_request",
parent = None, parent = None,
level = "debug", level = "debug",
skip_all, skip_all,
@@ -485,7 +478,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
} }
#[instrument( #[instrument(
name = SPAN_HANDLE_LIGHT_CLIENT_UPDATES_BY_RANGE, name = "lh_handle_light_client_updates_by_range",
parent = None, parent = None,
level = "debug", level = "debug",
skip_all, skip_all,
@@ -586,7 +579,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
/// Handle a `LightClientBootstrap` request from the peer. /// Handle a `LightClientBootstrap` request from the peer.
#[instrument( #[instrument(
name = SPAN_HANDLE_LIGHT_CLIENT_BOOTSTRAP, name = "lh_handle_light_client_bootstrap",
parent = None, parent = None,
level = "debug", level = "debug",
skip_all, skip_all,
@@ -626,7 +619,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
/// Handle a `LightClientOptimisticUpdate` request from the peer. /// Handle a `LightClientOptimisticUpdate` request from the peer.
#[instrument( #[instrument(
name = SPAN_HANDLE_LIGHT_CLIENT_OPTIMISTIC_UPDATE, name = "lh_handle_light_client_optimistic_update",
parent = None, parent = None,
level = "debug", level = "debug",
skip_all, skip_all,
@@ -660,7 +653,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
/// Handle a `LightClientFinalityUpdate` request from the peer. /// Handle a `LightClientFinalityUpdate` request from the peer.
#[instrument( #[instrument(
name = SPAN_HANDLE_LIGHT_CLIENT_FINALITY_UPDATE, name = "lh_handle_light_client_finality_update",
parent = None, parent = None,
level = "debug", level = "debug",
skip_all, skip_all,
@@ -694,7 +687,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
/// Handle a `BlocksByRange` request from the peer. /// Handle a `BlocksByRange` request from the peer.
#[instrument( #[instrument(
name = SPAN_HANDLE_BLOCKS_BY_RANGE_REQUEST, name = "lh_handle_blocks_by_range_request",
parent = None, parent = None,
level = "debug", level = "debug",
skip_all, skip_all,
@@ -990,7 +983,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
/// Handle a `BlobsByRange` request from the peer. /// Handle a `BlobsByRange` request from the peer.
#[instrument( #[instrument(
name = SPAN_HANDLE_BLOBS_BY_RANGE_REQUEST, name = "lh_handle_blobs_by_range_request",
parent = None, parent = None,
skip_all, skip_all,
level = "debug", level = "debug",
@@ -1155,7 +1148,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
/// Handle a `DataColumnsByRange` request from the peer. /// Handle a `DataColumnsByRange` request from the peer.
#[instrument( #[instrument(
name = SPAN_HANDLE_DATA_COLUMNS_BY_RANGE_REQUEST, name = "lh_handle_data_columns_by_range_request",
parent = None, parent = None,
skip_all, skip_all,
level = "debug", level = "debug",

View File

@@ -21,11 +21,6 @@ use beacon_processor::{
use beacon_processor::{Work, WorkEvent}; use beacon_processor::{Work, WorkEvent};
use lighthouse_network::PeerAction; use lighthouse_network::PeerAction;
use lighthouse_network::service::api_types::CustodyBackfillBatchId; use lighthouse_network::service::api_types::CustodyBackfillBatchId;
use lighthouse_tracing::{
SPAN_CUSTODY_BACKFILL_SYNC_IMPORT_COLUMNS, SPAN_PROCESS_CHAIN_SEGMENT,
SPAN_PROCESS_CHAIN_SEGMENT_BACKFILL, SPAN_PROCESS_RPC_BLOBS, SPAN_PROCESS_RPC_BLOCK,
SPAN_PROCESS_RPC_CUSTODY_COLUMNS,
};
use logging::crit; use logging::crit;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
@@ -107,7 +102,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
/// Attempt to process a block received from a direct RPC request. /// Attempt to process a block received from a direct RPC request.
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
#[instrument( #[instrument(
name = SPAN_PROCESS_RPC_BLOCK, name = "lh_process_rpc_block",
parent = None, parent = None,
level = "debug", level = "debug",
skip_all, skip_all,
@@ -261,7 +256,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
/// Attempt to process a list of blobs received from a direct RPC request. /// Attempt to process a list of blobs received from a direct RPC request.
#[instrument( #[instrument(
name = SPAN_PROCESS_RPC_BLOBS, name = "lh_process_rpc_blobs",
parent = None, parent = None,
level = "debug", level = "debug",
skip_all, skip_all,
@@ -349,7 +344,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
} }
#[instrument( #[instrument(
name = SPAN_PROCESS_RPC_CUSTODY_COLUMNS, name = "lh_process_rpc_custody_columns",
parent = None, parent = None,
level = "debug", level = "debug",
skip_all, skip_all,
@@ -429,7 +424,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
expected_cgc: u64, expected_cgc: u64,
) { ) {
let _guard = debug_span!( let _guard = debug_span!(
SPAN_CUSTODY_BACKFILL_SYNC_IMPORT_COLUMNS, "lh_custody_backfill_sync_import_columns",
epoch = %batch_id.epoch, epoch = %batch_id.epoch,
columns_received_count = downloaded_columns.len() columns_received_count = downloaded_columns.len()
) )
@@ -524,7 +519,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
/// Attempt to import the chain segment (`blocks`) to the beacon chain, informing the sync /// Attempt to import the chain segment (`blocks`) to the beacon chain, informing the sync
/// thread if more blocks are needed to process it. /// thread if more blocks are needed to process it.
#[instrument( #[instrument(
name = SPAN_PROCESS_CHAIN_SEGMENT, name = "lh_process_chain_segment",
parent = None, parent = None,
level = "debug", level = "debug",
skip_all, skip_all,
@@ -605,7 +600,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
/// Attempt to import the chain segment (`blocks`) to the beacon chain, informing the sync /// Attempt to import the chain segment (`blocks`) to the beacon chain, informing the sync
/// thread if more blocks are needed to process it. /// thread if more blocks are needed to process it.
#[instrument( #[instrument(
name = SPAN_PROCESS_CHAIN_SEGMENT_BACKFILL, name = "lh_process_chain_segment_backfill",
parent = None, parent = None,
level = "debug", level = "debug",
skip_all, skip_all,

View File

@@ -7,7 +7,6 @@ use crate::sync::network_context::{
use beacon_chain::{BeaconChainTypes, BlockProcessStatus}; use beacon_chain::{BeaconChainTypes, BlockProcessStatus};
use educe::Educe; use educe::Educe;
use lighthouse_network::service::api_types::Id; use lighthouse_network::service::api_types::Id;
use lighthouse_tracing::SPAN_SINGLE_BLOCK_LOOKUP;
use parking_lot::RwLock; use parking_lot::RwLock;
use std::collections::HashSet; use std::collections::HashSet;
use std::fmt::Debug; use std::fmt::Debug;
@@ -93,7 +92,7 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
awaiting_parent: Option<Hash256>, awaiting_parent: Option<Hash256>,
) -> Self { ) -> Self {
let lookup_span = debug_span!( let lookup_span = debug_span!(
SPAN_SINGLE_BLOCK_LOOKUP, "lh_single_block_lookup",
block_root = %requested_block_root, block_root = %requested_block_root,
id = id, id = id,
); );

View File

@@ -10,7 +10,6 @@ use lighthouse_network::{
service::api_types::{CustodyBackFillBatchRequestId, CustodyBackfillBatchId}, service::api_types::{CustodyBackFillBatchRequestId, CustodyBackfillBatchId},
types::CustodyBackFillState, types::CustodyBackFillState,
}; };
use lighthouse_tracing::SPAN_CUSTODY_BACKFILL_SYNC_BATCH_REQUEST;
use logging::crit; use logging::crit;
use std::hash::{DefaultHasher, Hash, Hasher}; use std::hash::{DefaultHasher, Hash, Hasher};
use tracing::{debug, error, info, info_span, warn}; use tracing::{debug, error, info, info_span, warn};
@@ -1004,7 +1003,7 @@ impl<T: BeaconChainTypes> CustodyBackFillSync<T> {
network: &mut SyncNetworkContext<T>, network: &mut SyncNetworkContext<T>,
batch_id: BatchId, batch_id: BatchId,
) -> Result<(), CustodyBackfillError> { ) -> Result<(), CustodyBackfillError> {
let span = info_span!(SPAN_CUSTODY_BACKFILL_SYNC_BATCH_REQUEST); let span = info_span!("lh_custody_backfill_sync_batch_request");
let _enter = span.enter(); let _enter = span.enter();
if let Some(batch) = self.batches.get_mut(&batch_id) { if let Some(batch) = self.batches.get_mut(&batch_id) {

View File

@@ -31,7 +31,6 @@ use lighthouse_network::service::api_types::{
DataColumnsByRootRequester, Id, SingleLookupReqId, SyncRequestId, DataColumnsByRootRequester, Id, SingleLookupReqId, SyncRequestId,
}; };
use lighthouse_network::{Client, NetworkGlobals, PeerAction, PeerId, ReportSource}; use lighthouse_network::{Client, NetworkGlobals, PeerAction, PeerId, ReportSource};
use lighthouse_tracing::{SPAN_OUTGOING_BLOCK_BY_ROOT_REQUEST, SPAN_OUTGOING_RANGE_REQUEST};
use parking_lot::RwLock; use parking_lot::RwLock;
pub use requests::LookupVerifyError; pub use requests::LookupVerifyError;
use requests::{ use requests::{
@@ -546,7 +545,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
) -> Result<Id, RpcRequestSendError> { ) -> Result<Id, RpcRequestSendError> {
let range_request_span = debug_span!( let range_request_span = debug_span!(
parent: None, parent: None,
SPAN_OUTGOING_RANGE_REQUEST, "lh_outgoing_range_request",
range_req_id = %requester, range_req_id = %requester,
block_peers = block_peers.len(), block_peers = block_peers.len(),
column_peers = column_peers.len() column_peers = column_peers.len()
@@ -908,7 +907,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
let request_span = debug_span!( let request_span = debug_span!(
parent: Span::current(), parent: Span::current(),
SPAN_OUTGOING_BLOCK_BY_ROOT_REQUEST, "lh_outgoing_block_by_root_request",
%block_root, %block_root,
); );
self.blocks_by_root_requests.insert( self.blocks_by_root_requests.insert(

View File

@@ -6,7 +6,6 @@ use beacon_chain::validator_monitor::timestamp_now;
use fnv::FnvHashMap; use fnv::FnvHashMap;
use lighthouse_network::PeerId; use lighthouse_network::PeerId;
use lighthouse_network::service::api_types::{CustodyId, DataColumnsByRootRequester}; use lighthouse_network::service::api_types::{CustodyId, DataColumnsByRootRequester};
use lighthouse_tracing::SPAN_OUTGOING_CUSTODY_REQUEST;
use parking_lot::RwLock; use parking_lot::RwLock;
use std::collections::HashSet; use std::collections::HashSet;
use std::hash::{BuildHasher, RandomState}; use std::hash::{BuildHasher, RandomState};
@@ -69,7 +68,7 @@ impl<T: BeaconChainTypes> ActiveCustodyRequest<T> {
) -> Self { ) -> Self {
let span = debug_span!( let span = debug_span!(
parent: Span::current(), parent: Span::current(),
SPAN_OUTGOING_CUSTODY_REQUEST, "lh_outgoing_custody_request",
%block_root, %block_root,
); );
Self { Self {

View File

@@ -12,7 +12,6 @@ use beacon_chain::BeaconChainTypes;
use beacon_chain::block_verification_types::RpcBlock; use beacon_chain::block_verification_types::RpcBlock;
use lighthouse_network::service::api_types::Id; use lighthouse_network::service::api_types::Id;
use lighthouse_network::{PeerAction, PeerId}; use lighthouse_network::{PeerAction, PeerId};
use lighthouse_tracing::SPAN_SYNCING_CHAIN;
use logging::crit; use logging::crit;
use std::collections::{BTreeMap, HashSet, btree_map::Entry}; use std::collections::{BTreeMap, HashSet, btree_map::Entry};
use std::hash::{Hash, Hasher}; use std::hash::{Hash, Hasher};
@@ -161,7 +160,7 @@ pub enum ChainSyncingState {
impl<T: BeaconChainTypes> SyncingChain<T> { impl<T: BeaconChainTypes> SyncingChain<T> {
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
#[instrument( #[instrument(
name = SPAN_SYNCING_CHAIN, name = "lh_syncing_chain",
parent = None, parent = None,
level="debug", level="debug",
skip_all, skip_all,

View File

@@ -0,0 +1,8 @@
[package]
name = "tracing_samplers"
version = "0.1.0"
edition = { workspace = true }
[dependencies]
opentelemetry = { workspace = true }
opentelemetry_sdk = { workspace = true }

View File

@@ -0,0 +1,78 @@
//! OpenTelemetry samplers for filtering traces.
use opentelemetry::Context;
use opentelemetry::trace::{Link, SamplingDecision, SamplingResult, SpanKind, TraceState};
use opentelemetry_sdk::trace::ShouldSample;
/// A sampler that only samples spans whose names start with a given prefix.
///
/// This sampler is designed to be used with `Sampler::ParentBased`, which will:
/// - Use this sampler's decision for root spans (no parent)
/// - Automatically inherit the parent's sampling decision for child spans
///
/// This ensures that only traces starting from known instrumented code paths are exported,
/// reducing noise from partially instrumented code paths.
#[derive(Debug, Clone)]
pub struct PrefixBasedSampler {
prefix: &'static str,
}
impl PrefixBasedSampler {
pub fn new(prefix: &'static str) -> Self {
Self { prefix }
}
}
impl ShouldSample for PrefixBasedSampler {
fn should_sample(
&self,
_parent_context: Option<&Context>,
_trace_id: opentelemetry::trace::TraceId,
name: &str,
_span_kind: &SpanKind,
_attributes: &[opentelemetry::KeyValue],
_links: &[Link],
) -> SamplingResult {
if name.starts_with(self.prefix) {
SamplingResult {
decision: SamplingDecision::RecordAndSample,
attributes: Vec::new(),
trace_state: TraceState::default(),
}
} else {
SamplingResult {
decision: SamplingDecision::Drop,
attributes: Vec::new(),
trace_state: TraceState::default(),
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use opentelemetry::trace::TraceId;
#[test]
fn prefix_based_sampler_filters_by_prefix() {
let sampler = PrefixBasedSampler::new("test_");
let trace_id = TraceId::from_hex("0123456789abcdef0123456789abcdef").unwrap();
// Spans with prefix should be sampled
let result = sampler.should_sample(
None,
trace_id,
"test_my_span",
&SpanKind::Internal,
&[],
&[],
);
assert!(matches!(result.decision, SamplingDecision::RecordAndSample));
// Spans without prefix should be dropped
let result =
sampler.should_sample(None, trace_id, "other_span", &SpanKind::Internal, &[], &[]);
assert!(matches!(result.decision, SamplingDecision::Drop));
}
}

View File

@@ -53,7 +53,6 @@ environment = { workspace = true }
eth2_network_config = { workspace = true } eth2_network_config = { workspace = true }
ethereum_hashing = { workspace = true } ethereum_hashing = { workspace = true }
futures = { workspace = true } futures = { workspace = true }
lighthouse_tracing = { workspace = true }
lighthouse_version = { workspace = true } lighthouse_version = { workspace = true }
logging = { workspace = true } logging = { workspace = true }
metrics = { workspace = true } metrics = { workspace = true }
@@ -70,6 +69,7 @@ task_executor = { workspace = true }
tracing = { workspace = true } tracing = { workspace = true }
tracing-opentelemetry = { workspace = true } tracing-opentelemetry = { workspace = true }
tracing-subscriber = { workspace = true } tracing-subscriber = { workspace = true }
tracing_samplers = { workspace = true }
types = { workspace = true } types = { workspace = true }
validator_client = { workspace = true } validator_client = { workspace = true }
validator_manager = { path = "../validator_manager" } validator_manager = { path = "../validator_manager" }

View File

@@ -29,6 +29,7 @@ use std::process::exit;
use std::sync::LazyLock; use std::sync::LazyLock;
use task_executor::ShutdownReason; use task_executor::ShutdownReason;
use tracing::{Level, info}; use tracing::{Level, info};
use tracing_samplers::PrefixBasedSampler;
use tracing_subscriber::{Layer, filter::EnvFilter, layer::SubscriberExt, util::SubscriberInitExt}; use tracing_subscriber::{Layer, filter::EnvFilter, layer::SubscriberExt, util::SubscriberInitExt};
use types::{EthSpec, EthSpecId}; use types::{EthSpec, EthSpecId};
use validator_client::ProductionValidatorClient; use validator_client::ProductionValidatorClient;
@@ -675,7 +676,15 @@ fn run<E: EthSpec>(
_ => "lighthouse".to_string(), _ => "lighthouse".to_string(),
}); });
// Use ParentBased sampler with PrefixBasedSampler to only export traces
// that start from known instrumented code paths (lh_ prefix). Child spans
// automatically inherit their parent's sampling decision.
let sampler = opentelemetry_sdk::trace::Sampler::ParentBased(Box::new(
PrefixBasedSampler::new("lh_"),
));
let provider = opentelemetry_sdk::trace::SdkTracerProvider::builder() let provider = opentelemetry_sdk::trace::SdkTracerProvider::builder()
.with_sampler(sampler)
.with_batch_exporter(exporter) .with_batch_exporter(exporter)
.with_resource( .with_resource(
opentelemetry_sdk::Resource::builder() opentelemetry_sdk::Resource::builder()