mirror of
https://github.com/sigp/lighthouse.git
synced 2026-03-06 10:11:44 +00:00
Instrument tracing on block production code path (#8017)
Partially #7814. Instrument block production code path. New root spans: * `produce_block_v3` * `produce_block_v2` Example traces: <img width="518" height="432" alt="image" src="https://github.com/user-attachments/assets/a9413d25-501c-49dc-95cc-623db5988981" /> Co-Authored-By: Jimmy Chen <jchen.tc@gmail.com>
This commit is contained in:
@@ -1437,6 +1437,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
///
|
||||
/// Returns `None` when the state is not found in the database or there is an error skipping
|
||||
/// to a future state.
|
||||
#[instrument(level = "debug", skip_all)]
|
||||
pub fn state_at_slot(
|
||||
&self,
|
||||
slot: Slot,
|
||||
@@ -4466,6 +4467,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
}
|
||||
|
||||
/// If configured, wait for the fork choice run at the start of the slot to complete.
|
||||
#[instrument(level = "debug", skip_all)]
|
||||
fn wait_for_fork_choice_before_block_production(
|
||||
self: &Arc<Self>,
|
||||
slot: Slot,
|
||||
@@ -4528,10 +4530,15 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
//
|
||||
// Load the parent state from disk.
|
||||
let chain = self.clone();
|
||||
let span = Span::current();
|
||||
let (state, state_root_opt) = self
|
||||
.task_executor
|
||||
.spawn_blocking_handle(
|
||||
move || chain.load_state_for_block_production(slot),
|
||||
move || {
|
||||
let _guard =
|
||||
debug_span!(parent: span, "load_state_for_block_production").entered();
|
||||
chain.load_state_for_block_production(slot)
|
||||
},
|
||||
"load_state_for_block_production",
|
||||
)
|
||||
.ok_or(BlockProductionError::ShuttingDown)?
|
||||
@@ -4618,6 +4625,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
/// Fetch the beacon state to use for producing a block if a 1-slot proposer re-org is viable.
|
||||
///
|
||||
/// This function will return `None` if proposer re-orgs are disabled.
|
||||
#[instrument(skip_all, level = "debug")]
|
||||
fn get_state_for_re_org(
|
||||
&self,
|
||||
slot: Slot,
|
||||
@@ -5072,6 +5080,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
/// equal to the root of `state`. Providing this value will serve as an optimization to avoid
|
||||
/// performing a tree hash in some scenarios.
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
#[instrument(level = "debug", skip_all)]
|
||||
pub async fn produce_block_on_state(
|
||||
self: &Arc<Self>,
|
||||
state: BeaconState<T::EthSpec>,
|
||||
@@ -5091,10 +5100,13 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
.graffiti_calculator
|
||||
.get_graffiti(validator_graffiti)
|
||||
.await;
|
||||
let span = Span::current();
|
||||
let mut partial_beacon_block = self
|
||||
.task_executor
|
||||
.spawn_blocking_handle(
|
||||
move || {
|
||||
let _guard =
|
||||
debug_span!(parent: span, "produce_partial_beacon_block").entered();
|
||||
chain.produce_partial_beacon_block(
|
||||
state,
|
||||
state_root_opt,
|
||||
@@ -5130,10 +5142,14 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
match block_contents_type {
|
||||
BlockProposalContentsType::Full(block_contents) => {
|
||||
let chain = self.clone();
|
||||
let span = Span::current();
|
||||
let beacon_block_response = self
|
||||
.task_executor
|
||||
.spawn_blocking_handle(
|
||||
move || {
|
||||
let _guard =
|
||||
debug_span!(parent: span, "complete_partial_beacon_block")
|
||||
.entered();
|
||||
chain.complete_partial_beacon_block(
|
||||
partial_beacon_block,
|
||||
Some(block_contents),
|
||||
@@ -5150,10 +5166,14 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
}
|
||||
BlockProposalContentsType::Blinded(block_contents) => {
|
||||
let chain = self.clone();
|
||||
let span = Span::current();
|
||||
let beacon_block_response = self
|
||||
.task_executor
|
||||
.spawn_blocking_handle(
|
||||
move || {
|
||||
let _guard =
|
||||
debug_span!(parent: span, "complete_partial_beacon_block")
|
||||
.entered();
|
||||
chain.complete_partial_beacon_block(
|
||||
partial_beacon_block,
|
||||
Some(block_contents),
|
||||
@@ -5171,10 +5191,13 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
}
|
||||
} else {
|
||||
let chain = self.clone();
|
||||
let span = Span::current();
|
||||
let beacon_block_response = self
|
||||
.task_executor
|
||||
.spawn_blocking_handle(
|
||||
move || {
|
||||
let _guard =
|
||||
debug_span!(parent: span, "complete_partial_beacon_block").entered();
|
||||
chain.complete_partial_beacon_block(
|
||||
partial_beacon_block,
|
||||
None,
|
||||
@@ -5276,51 +5299,54 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
|
||||
// Iterate through the naive aggregation pool and ensure all the attestations from there
|
||||
// are included in the operation pool.
|
||||
let unagg_import_timer =
|
||||
metrics::start_timer(&metrics::BLOCK_PRODUCTION_UNAGGREGATED_TIMES);
|
||||
for attestation in self.naive_aggregation_pool.read().iter() {
|
||||
let import = |attestation: &Attestation<T::EthSpec>| {
|
||||
let attesting_indices =
|
||||
get_attesting_indices_from_state(&state, attestation.to_ref())?;
|
||||
self.op_pool
|
||||
.insert_attestation(attestation.clone(), attesting_indices)
|
||||
};
|
||||
if let Err(e) = import(attestation) {
|
||||
// Don't stop block production if there's an error, just create a log.
|
||||
error!(
|
||||
reason = ?e,
|
||||
"Attestation did not transfer to op pool"
|
||||
);
|
||||
{
|
||||
let _guard = debug_span!("import_naive_aggregation_pool").entered();
|
||||
let _unagg_import_timer =
|
||||
metrics::start_timer(&metrics::BLOCK_PRODUCTION_UNAGGREGATED_TIMES);
|
||||
for attestation in self.naive_aggregation_pool.read().iter() {
|
||||
let import = |attestation: &Attestation<T::EthSpec>| {
|
||||
let attesting_indices =
|
||||
get_attesting_indices_from_state(&state, attestation.to_ref())?;
|
||||
self.op_pool
|
||||
.insert_attestation(attestation.clone(), attesting_indices)
|
||||
};
|
||||
if let Err(e) = import(attestation) {
|
||||
// Don't stop block production if there's an error, just create a log.
|
||||
error!(
|
||||
reason = ?e,
|
||||
"Attestation did not transfer to op pool"
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
drop(unagg_import_timer);
|
||||
|
||||
let attestation_packing_timer =
|
||||
metrics::start_timer(&metrics::BLOCK_PRODUCTION_ATTESTATION_TIMES);
|
||||
|
||||
// Epoch cache and total balance cache are required for op pool packing.
|
||||
state.build_total_active_balance_cache(&self.spec)?;
|
||||
initialize_epoch_cache(&mut state, &self.spec)?;
|
||||
|
||||
let mut prev_filter_cache = HashMap::new();
|
||||
let prev_attestation_filter = |att: &CompactAttestationRef<T::EthSpec>| {
|
||||
self.filter_op_pool_attestation(&mut prev_filter_cache, att, &state)
|
||||
};
|
||||
let mut curr_filter_cache = HashMap::new();
|
||||
let curr_attestation_filter = |att: &CompactAttestationRef<T::EthSpec>| {
|
||||
self.filter_op_pool_attestation(&mut curr_filter_cache, att, &state)
|
||||
};
|
||||
|
||||
let mut attestations = self
|
||||
.op_pool
|
||||
.get_attestations(
|
||||
&state,
|
||||
prev_attestation_filter,
|
||||
curr_attestation_filter,
|
||||
&self.spec,
|
||||
)
|
||||
.map_err(BlockProductionError::OpPoolError)?;
|
||||
drop(attestation_packing_timer);
|
||||
let mut attestations = {
|
||||
let _guard = debug_span!("pack_attestations").entered();
|
||||
let _attestation_packing_timer =
|
||||
metrics::start_timer(&metrics::BLOCK_PRODUCTION_ATTESTATION_TIMES);
|
||||
|
||||
// Epoch cache and total balance cache are required for op pool packing.
|
||||
state.build_total_active_balance_cache(&self.spec)?;
|
||||
initialize_epoch_cache(&mut state, &self.spec)?;
|
||||
|
||||
let mut prev_filter_cache = HashMap::new();
|
||||
let prev_attestation_filter = |att: &CompactAttestationRef<T::EthSpec>| {
|
||||
self.filter_op_pool_attestation(&mut prev_filter_cache, att, &state)
|
||||
};
|
||||
let mut curr_filter_cache = HashMap::new();
|
||||
let curr_attestation_filter = |att: &CompactAttestationRef<T::EthSpec>| {
|
||||
self.filter_op_pool_attestation(&mut curr_filter_cache, att, &state)
|
||||
};
|
||||
|
||||
self.op_pool
|
||||
.get_attestations(
|
||||
&state,
|
||||
prev_attestation_filter,
|
||||
curr_attestation_filter,
|
||||
&self.spec,
|
||||
)
|
||||
.map_err(BlockProductionError::OpPoolError)?
|
||||
};
|
||||
|
||||
// If paranoid mode is enabled re-check the signatures of every included message.
|
||||
// This will be a lot slower but guards against bugs in block production and can be
|
||||
|
||||
@@ -24,7 +24,7 @@ use state_processing::per_block_processing::{
|
||||
};
|
||||
use std::sync::Arc;
|
||||
use tokio::task::JoinHandle;
|
||||
use tracing::{debug, warn};
|
||||
use tracing::{Instrument, debug, debug_span, warn};
|
||||
use tree_hash::TreeHash;
|
||||
use types::payload::BlockProductionVersion;
|
||||
use types::*;
|
||||
@@ -403,8 +403,9 @@ pub fn get_execution_payload<T: BeaconChainTypes>(
|
||||
block_production_version,
|
||||
)
|
||||
.await
|
||||
},
|
||||
"get_execution_payload",
|
||||
}
|
||||
.instrument(debug_span!("prepare_execution_payload")),
|
||||
"prepare_execution_payload",
|
||||
)
|
||||
.ok_or(BlockProductionError::ShuttingDown)?;
|
||||
|
||||
@@ -503,6 +504,7 @@ where
|
||||
},
|
||||
"prepare_execution_payload_forkchoice_update_params",
|
||||
)
|
||||
.instrument(debug_span!("forkchoice_update_params"))
|
||||
.await
|
||||
.map_err(|e| BlockProductionError::BeaconChain(Box::new(e)))?;
|
||||
|
||||
|
||||
Reference in New Issue
Block a user