Implement tracing spans for data columm RPC requests and responses (#7831)

#7830
This commit is contained in:
Jimmy Chen
2025-08-21 09:35:51 +10:00
committed by GitHub
parent 2d223575d6
commit f19d4f6af1
18 changed files with 491 additions and 66 deletions

View File

@@ -293,6 +293,7 @@ async fn process_block(&self, block: Block) -> Result<(), Error> {
## Build and Development Notes
- Full builds and tests take 5+ minutes - use large timeouts (300s+) for any `cargo build`, `cargo test`, or `make` commands
- Use `cargo check` for faster iteration during development and always run after code changes
- Use `cargo fmt --all && make lint-fix` to format code and fix linting issues once a task is complete
- Prefer targeted package tests (`cargo test -p <package>`) and individual tests over full test suite when debugging specific issues
- Always understand the broader codebase patterns before making changes
- Minimum Supported Rust Version (MSRV) is documented in `lighthouse/Cargo.toml` - ensure Rust version meets or exceeds this requirement

7
Cargo.lock generated
View File

@@ -887,6 +887,7 @@ dependencies = [
"int_to_bytes",
"itertools 0.10.5",
"kzg",
"lighthouse_tracing",
"lighthouse_version",
"logging",
"lru",
@@ -5663,6 +5664,7 @@ dependencies = [
"futures",
"initialized_validators",
"lighthouse_network",
"lighthouse_tracing",
"lighthouse_version",
"logging",
"malloc_utils",
@@ -5745,6 +5747,10 @@ dependencies = [
"unused_port",
]
[[package]]
name = "lighthouse_tracing"
version = "0.1.0"
[[package]]
name = "lighthouse_validator_store"
version = "0.1.0"
@@ -6412,6 +6418,7 @@ dependencies = [
"kzg",
"libp2p-gossipsub",
"lighthouse_network",
"lighthouse_tracing",
"logging",
"lru_cache",
"matches",

View File

@@ -11,6 +11,7 @@ members = [
"beacon_node/http_api",
"beacon_node/http_metrics",
"beacon_node/lighthouse_network",
"beacon_node/lighthouse_tracing",
"beacon_node/network",
"beacon_node/operation_pool",
"beacon_node/store",
@@ -173,6 +174,7 @@ itertools = "0.10"
kzg = { path = "crypto/kzg" }
libsecp256k1 = "0.7"
lighthouse_network = { path = "beacon_node/lighthouse_network" }
lighthouse_tracing = { path = "beacon_node/lighthouse_tracing" }
lighthouse_validator_store = { path = "validator_client/lighthouse_validator_store" }
lighthouse_version = { path = "common/lighthouse_version" }
lockfile = { path = "common/lockfile" }

View File

@@ -33,6 +33,7 @@ hex = { workspace = true }
int_to_bytes = { workspace = true }
itertools = { workspace = true }
kzg = { workspace = true }
lighthouse_tracing = { workspace = true }
lighthouse_version = { workspace = true }
logging = { workspace = true }
lru = { workspace = true }

View File

@@ -826,7 +826,7 @@ impl<T: BeaconChainTypes> GossipVerifiedBlock<T> {
/// on the p2p network.
///
/// Returns an error if the block is invalid, or if the block was unable to be verified.
#[instrument(name = "verify_gossip_block", skip_all)]
#[instrument(name = "verify_gossip_block", skip_all, fields(block_root = tracing::field::Empty))]
pub fn new(
block: Arc<SignedBeaconBlock<T::EthSpec>>,
chain: &BeaconChain<T>,
@@ -1227,12 +1227,9 @@ impl<T: BeaconChainTypes> SignatureVerifiedBlock<T> {
signature_verifier
.include_all_signatures_except_proposal(block.as_ref(), &mut consensus_context)?;
let sig_verify_span = info_span!("signature_verify", result = "started").entered();
let result = signature_verifier.verify();
let result = info_span!("signature_verify").in_scope(|| signature_verifier.verify());
match result {
Ok(_) => {
sig_verify_span.record("result", "ok");
Ok(Self {
Ok(_) => Ok(Self {
block: MaybeAvailableBlock::AvailabilityPending {
block_root: from.block_root,
block,
@@ -1240,14 +1237,10 @@ impl<T: BeaconChainTypes> SignatureVerifiedBlock<T> {
block_root: from.block_root,
parent: Some(parent),
consensus_context,
})
}
Err(_) => {
sig_verify_span.record("result", "fail");
Err(BlockError::InvalidSignature(
}),
Err(_) => Err(BlockError::InvalidSignature(
InvalidSignature::BlockBodySignatures,
))
}
)),
}
}

View File

@@ -9,6 +9,7 @@ use crate::block_verification_types::{
};
use crate::data_availability_checker::{Availability, AvailabilityCheckError};
use crate::data_column_verification::KzgVerifiedCustodyDataColumn;
use lighthouse_tracing::SPAN_PENDING_COMPONENTS;
use lru::LruCache;
use parking_lot::RwLock;
use std::cmp::Ordering;
@@ -288,7 +289,7 @@ impl<E: EthSpec> PendingComponents<E> {
/// Returns an empty `PendingComponents` object with the given block root.
pub fn empty(block_root: Hash256, max_len: usize) -> Self {
let span = debug_span!(parent: None, "pending_components", %block_root);
let span = debug_span!(parent: None, SPAN_PENDING_COMPONENTS, %block_root);
let _guard = span.clone().entered();
Self {
block_root,

View File

@@ -0,0 +1,4 @@
[package]
name = "lighthouse_tracing"
version = "0.1.0"
edition = { workspace = true }

View File

@@ -0,0 +1,61 @@
//! This module contains root span identifiers for key code paths in the beacon node.
//!
//! TODO: These span identifiers will be used to implement selective tracing export (to be implemented),
//! where only the listed root spans and their descendants will be exported to the tracing backend.
/// Data Availability checker span identifiers
pub const SPAN_PENDING_COMPONENTS: &str = "pending_components";
/// Gossip methods root spans
pub const SPAN_PROCESS_GOSSIP_DATA_COLUMN: &str = "process_gossip_data_column";
pub const SPAN_PROCESS_GOSSIP_BLOB: &str = "process_gossip_blob";
pub const SPAN_PROCESS_GOSSIP_BLOCK: &str = "process_gossip_block";
/// Sync methods root spans
pub const SPAN_SYNCING_CHAIN: &str = "syncing_chain";
pub const SPAN_OUTGOING_RANGE_REQUEST: &str = "outgoing_range_request";
pub const SPAN_OUTGOING_CUSTODY_REQUEST: &str = "outgoing_custody_request";
pub const SPAN_PROCESS_RPC_BLOCK: &str = "process_rpc_block";
pub const SPAN_PROCESS_RPC_BLOBS: &str = "process_rpc_blobs";
pub const SPAN_PROCESS_RPC_CUSTODY_COLUMNS: &str = "process_rpc_custody_columns";
pub const SPAN_PROCESS_CHAIN_SEGMENT: &str = "process_chain_segment";
/// RPC methods root spans
pub const SPAN_HANDLE_BLOCKS_BY_RANGE_REQUEST: &str = "handle_blocks_by_range_request";
pub const SPAN_HANDLE_BLOBS_BY_RANGE_REQUEST: &str = "handle_blobs_by_range_request";
pub const SPAN_HANDLE_DATA_COLUMNS_BY_RANGE_REQUEST: &str = "handle_data_columns_by_range_request";
pub const SPAN_HANDLE_BLOCKS_BY_ROOT_REQUEST: &str = "handle_blocks_by_root_request";
pub const SPAN_HANDLE_BLOBS_BY_ROOT_REQUEST: &str = "handle_blobs_by_root_request";
pub const SPAN_HANDLE_DATA_COLUMNS_BY_ROOT_REQUEST: &str = "handle_data_columns_by_root_request";
pub const SPAN_HANDLE_LIGHT_CLIENT_UPDATES_BY_RANGE: &str = "handle_light_client_updates_by_range";
pub const SPAN_HANDLE_LIGHT_CLIENT_BOOTSTRAP: &str = "handle_light_client_bootstrap";
pub const SPAN_HANDLE_LIGHT_CLIENT_OPTIMISTIC_UPDATE: &str =
"handle_light_client_optimistic_update";
pub const SPAN_HANDLE_LIGHT_CLIENT_FINALITY_UPDATE: &str = "handle_light_client_finality_update";
/// List of all root span names that are allowed to be exported to the tracing backend.
/// Only these spans and their descendants will be processed to reduce noise from
/// uninstrumented code paths. New root spans must be added to this list to be traced.
pub const LH_BN_ROOT_SPAN_NAMES: &[&str] = &[
SPAN_SYNCING_CHAIN,
SPAN_PENDING_COMPONENTS,
SPAN_PROCESS_GOSSIP_DATA_COLUMN,
SPAN_PROCESS_GOSSIP_BLOB,
SPAN_PROCESS_GOSSIP_BLOCK,
SPAN_OUTGOING_RANGE_REQUEST,
SPAN_OUTGOING_CUSTODY_REQUEST,
SPAN_PROCESS_RPC_BLOCK,
SPAN_PROCESS_RPC_BLOBS,
SPAN_PROCESS_RPC_CUSTODY_COLUMNS,
SPAN_PROCESS_CHAIN_SEGMENT,
SPAN_HANDLE_BLOCKS_BY_RANGE_REQUEST,
SPAN_HANDLE_BLOBS_BY_RANGE_REQUEST,
SPAN_HANDLE_DATA_COLUMNS_BY_RANGE_REQUEST,
SPAN_HANDLE_BLOCKS_BY_ROOT_REQUEST,
SPAN_HANDLE_BLOBS_BY_ROOT_REQUEST,
SPAN_HANDLE_DATA_COLUMNS_BY_ROOT_REQUEST,
SPAN_HANDLE_LIGHT_CLIENT_UPDATES_BY_RANGE,
SPAN_HANDLE_LIGHT_CLIENT_BOOTSTRAP,
SPAN_HANDLE_LIGHT_CLIENT_OPTIMISTIC_UPDATE,
SPAN_HANDLE_LIGHT_CLIENT_FINALITY_UPDATE,
];

View File

@@ -28,6 +28,7 @@ hex = { workspace = true }
igd-next = { version = "0.16", features = ["aio_tokio"] }
itertools = { workspace = true }
lighthouse_network = { workspace = true }
lighthouse_tracing = { workspace = true }
logging = { workspace = true }
lru_cache = { workspace = true }
metrics = { workspace = true }

View File

@@ -21,6 +21,9 @@ use beacon_chain::{
};
use beacon_processor::{Work, WorkEvent};
use lighthouse_network::{Client, MessageAcceptance, MessageId, PeerAction, PeerId, ReportSource};
use lighthouse_tracing::{
SPAN_PROCESS_GOSSIP_BLOB, SPAN_PROCESS_GOSSIP_BLOCK, SPAN_PROCESS_GOSSIP_DATA_COLUMN,
};
use logging::crit;
use operation_pool::ReceivedPreCapella;
use slot_clock::SlotClock;
@@ -602,7 +605,13 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
}
}
#[instrument(skip_all, level = "trace", fields(slot = ?column_sidecar.slot(), block_root = ?column_sidecar.block_root(), index = column_sidecar.index), parent = None)]
#[instrument(
name = SPAN_PROCESS_GOSSIP_DATA_COLUMN,
parent = None,
level = "debug",
skip_all,
fields(slot = ?column_sidecar.slot(), block_root = ?column_sidecar.block_root(), index = column_sidecar.index),
)]
pub async fn process_gossip_data_column_sidecar(
self: &Arc<Self>,
message_id: MessageId,
@@ -760,7 +769,16 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
}
#[allow(clippy::too_many_arguments)]
#[instrument(skip_all, level = "trace", fields(slot = ?blob_sidecar.slot(), block_root = ?blob_sidecar.block_root(), index = blob_sidecar.index), parent = None)]
#[instrument(
name = SPAN_PROCESS_GOSSIP_BLOB,
parent = None,
level = "debug",
skip_all,
fields(
slot = ?blob_sidecar.slot(),
block_root = ?blob_sidecar.block_root(),
index = blob_sidecar.index),
)]
pub async fn process_gossip_blob(
self: &Arc<Self>,
message_id: MessageId,
@@ -1098,7 +1116,13 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
///
/// Raises a log if there are errors.
#[allow(clippy::too_many_arguments)]
#[instrument(skip_all, fields(block_root = tracing::field::Empty), parent = None)]
#[instrument(
name = SPAN_PROCESS_GOSSIP_BLOCK,
parent = None,
level = "debug",
skip_all,
fields(block_root = tracing::field::Empty),
)]
pub async fn process_gossip_block(
self: Arc<Self>,
message_id: MessageId,

View File

@@ -10,14 +10,21 @@ use lighthouse_network::rpc::methods::{
};
use lighthouse_network::rpc::*;
use lighthouse_network::{PeerId, ReportSource, Response, SyncInfo};
use lighthouse_tracing::{
SPAN_HANDLE_BLOBS_BY_RANGE_REQUEST, SPAN_HANDLE_BLOBS_BY_ROOT_REQUEST,
SPAN_HANDLE_BLOCKS_BY_RANGE_REQUEST, SPAN_HANDLE_BLOCKS_BY_ROOT_REQUEST,
SPAN_HANDLE_DATA_COLUMNS_BY_RANGE_REQUEST, SPAN_HANDLE_DATA_COLUMNS_BY_ROOT_REQUEST,
SPAN_HANDLE_LIGHT_CLIENT_BOOTSTRAP, SPAN_HANDLE_LIGHT_CLIENT_FINALITY_UPDATE,
SPAN_HANDLE_LIGHT_CLIENT_OPTIMISTIC_UPDATE, SPAN_HANDLE_LIGHT_CLIENT_UPDATES_BY_RANGE,
};
use methods::LightClientUpdatesByRangeRequest;
use slot_clock::SlotClock;
use std::collections::{HashMap, hash_map::Entry};
use std::sync::Arc;
use tokio_stream::StreamExt;
use tracing::{debug, error, instrument, warn};
use tracing::{Span, debug, error, field, instrument, warn};
use types::blob_sidecar::BlobIdentifier;
use types::{Epoch, EthSpec, Hash256, Slot};
use types::{ColumnIndex, Epoch, EthSpec, Hash256, Slot};
impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
/* Auxiliary functions */
@@ -155,13 +162,22 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
}
/// Handle a `BlocksByRoot` request from the peer.
#[instrument(skip_all, level = "debug")]
#[instrument(
name = SPAN_HANDLE_BLOCKS_BY_ROOT_REQUEST,
parent = None,
level = "debug",
skip_all,
fields(peer_id = %peer_id, client = tracing::field::Empty)
)]
pub async fn handle_blocks_by_root_request(
self: Arc<Self>,
peer_id: PeerId,
inbound_request_id: InboundRequestId,
request: BlocksByRootRequest,
) {
let client = self.network_globals.client(&peer_id);
Span::current().record("client", field::display(client.kind));
self.terminate_response_stream(
peer_id,
inbound_request_id,
@@ -246,13 +262,22 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
}
/// Handle a `BlobsByRoot` request from the peer.
#[instrument(skip_all, level = "debug")]
#[instrument(
name = SPAN_HANDLE_BLOBS_BY_ROOT_REQUEST,
parent = None,
level = "debug",
skip_all,
fields(peer_id = %peer_id, client = tracing::field::Empty)
)]
pub fn handle_blobs_by_root_request(
self: Arc<Self>,
peer_id: PeerId,
inbound_request_id: InboundRequestId,
request: BlobsByRootRequest,
) {
let client = self.network_globals.client(&peer_id);
Span::current().record("client", field::display(client.kind));
self.terminate_response_stream(
peer_id,
inbound_request_id,
@@ -341,13 +366,36 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
}
/// Handle a `DataColumnsByRoot` request from the peer.
#[instrument(skip_all, level = "debug")]
#[instrument(
name = SPAN_HANDLE_DATA_COLUMNS_BY_ROOT_REQUEST,
parent = None,
level = "debug",
skip_all,
fields(
peer_id = %peer_id,
client = tracing::field::Empty,
non_custody_indices = tracing::field::Empty,
)
)]
pub fn handle_data_columns_by_root_request(
self: Arc<Self>,
peer_id: PeerId,
inbound_request_id: InboundRequestId,
request: DataColumnsByRootRequest<T::EthSpec>,
) {
let requested_columns = request
.data_column_ids
.iter()
.flat_map(|id| id.columns.clone())
.unique()
.collect::<Vec<_>>();
self.record_data_column_request_in_span(
&peer_id,
&requested_columns,
None,
Span::current(),
);
self.terminate_response_stream(
peer_id,
inbound_request_id,
@@ -411,13 +459,22 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
Ok(())
}
#[instrument(skip_all, level = "debug")]
#[instrument(
name = SPAN_HANDLE_LIGHT_CLIENT_UPDATES_BY_RANGE,
parent = None,
level = "debug",
skip_all,
fields(peer_id = %peer_id, client = tracing::field::Empty)
)]
pub fn handle_light_client_updates_by_range(
self: &Arc<Self>,
peer_id: PeerId,
inbound_request_id: InboundRequestId,
request: LightClientUpdatesByRangeRequest,
) {
let client = self.network_globals.client(&peer_id);
Span::current().record("client", field::display(client.kind));
self.terminate_response_stream(
peer_id,
inbound_request_id,
@@ -503,13 +560,22 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
}
/// Handle a `LightClientBootstrap` request from the peer.
#[instrument(skip_all, level = "debug")]
#[instrument(
name = SPAN_HANDLE_LIGHT_CLIENT_BOOTSTRAP,
parent = None,
level = "debug",
skip_all,
fields(peer_id = %peer_id, client = tracing::field::Empty)
)]
pub fn handle_light_client_bootstrap(
self: &Arc<Self>,
peer_id: PeerId,
inbound_request_id: InboundRequestId,
request: LightClientBootstrapRequest,
) {
let client = self.network_globals.client(&peer_id);
Span::current().record("client", field::display(client.kind));
self.terminate_response_single_item(
peer_id,
inbound_request_id,
@@ -534,12 +600,21 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
}
/// Handle a `LightClientOptimisticUpdate` request from the peer.
#[instrument(skip_all, level = "debug")]
#[instrument(
name = SPAN_HANDLE_LIGHT_CLIENT_OPTIMISTIC_UPDATE,
parent = None,
level = "debug",
skip_all,
fields(peer_id = %peer_id, client = tracing::field::Empty)
)]
pub fn handle_light_client_optimistic_update(
self: &Arc<Self>,
peer_id: PeerId,
inbound_request_id: InboundRequestId,
) {
let client = self.network_globals.client(&peer_id);
Span::current().record("client", field::display(client.kind));
self.terminate_response_single_item(
peer_id,
inbound_request_id,
@@ -559,12 +634,21 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
}
/// Handle a `LightClientFinalityUpdate` request from the peer.
#[instrument(skip_all, level = "debug")]
#[instrument(
name = SPAN_HANDLE_LIGHT_CLIENT_FINALITY_UPDATE,
parent = None,
level = "debug",
skip_all,
fields(peer_id = %peer_id, client = tracing::field::Empty)
)]
pub fn handle_light_client_finality_update(
self: &Arc<Self>,
peer_id: PeerId,
inbound_request_id: InboundRequestId,
) {
let client = self.network_globals.client(&peer_id);
Span::current().record("client", field::display(client.kind));
self.terminate_response_single_item(
peer_id,
inbound_request_id,
@@ -584,13 +668,22 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
}
/// Handle a `BlocksByRange` request from the peer.
#[instrument(skip_all, level = "debug")]
#[instrument(
name = SPAN_HANDLE_BLOCKS_BY_RANGE_REQUEST,
parent = None,
level = "debug",
skip_all,
fields(peer_id = %peer_id, client = tracing::field::Empty)
)]
pub async fn handle_blocks_by_range_request(
self: Arc<Self>,
peer_id: PeerId,
inbound_request_id: InboundRequestId,
req: BlocksByRangeRequest,
) {
let client = self.network_globals.client(&peer_id);
Span::current().record("client", field::display(client.kind));
self.terminate_response_stream(
peer_id,
inbound_request_id,
@@ -871,13 +964,22 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
}
/// Handle a `BlobsByRange` request from the peer.
#[instrument(skip_all, level = "debug")]
#[instrument(
name = SPAN_HANDLE_BLOBS_BY_RANGE_REQUEST,
parent = None,
skip_all,
level = "debug",
fields(peer_id = %peer_id, client = tracing::field::Empty)
)]
pub fn handle_blobs_by_range_request(
self: Arc<Self>,
peer_id: PeerId,
inbound_request_id: InboundRequestId,
req: BlobsByRangeRequest,
) {
let client = self.network_globals.client(&peer_id);
Span::current().record("client", field::display(client.kind));
self.terminate_response_stream(
peer_id,
inbound_request_id,
@@ -999,13 +1101,27 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
}
/// Handle a `DataColumnsByRange` request from the peer.
#[instrument(skip_all, level = "debug")]
#[instrument(
name = SPAN_HANDLE_DATA_COLUMNS_BY_RANGE_REQUEST,
parent = None,
skip_all,
level = "debug",
fields(peer_id = %peer_id, non_custody_indices = tracing::field::Empty, client = tracing::field::Empty)
)]
pub fn handle_data_columns_by_range_request(
&self,
peer_id: PeerId,
inbound_request_id: InboundRequestId,
req: DataColumnsByRangeRequest,
) {
let epoch = Slot::new(req.start_slot).epoch(T::EthSpec::slots_per_epoch());
self.record_data_column_request_in_span(
&peer_id,
&req.columns,
Some(epoch),
Span::current(),
);
self.terminate_response_stream(
peer_id,
inbound_request_id,
@@ -1181,4 +1297,29 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
}
}
}
fn record_data_column_request_in_span(
&self,
peer_id: &PeerId,
requested_indices: &[ColumnIndex],
epoch_opt: Option<Epoch>,
span: Span,
) {
let non_custody_indices = {
let custody_columns = self
.chain
.data_availability_checker
.custody_context()
.custody_columns_for_epoch(epoch_opt, &self.chain.spec);
requested_indices
.iter()
.filter(|subnet_id| !custody_columns.contains(subnet_id))
.collect::<Vec<_>>()
};
// This field is used to identify if peers are sending requests on columns we don't custody.
span.record("non_custody_indices", field::debug(non_custody_indices));
let client = self.network_globals.client(peer_id);
span.record("client", field::display(client.kind));
}
}

View File

@@ -18,10 +18,14 @@ use beacon_processor::{
};
use beacon_processor::{Work, WorkEvent};
use lighthouse_network::PeerAction;
use lighthouse_tracing::{
SPAN_PROCESS_CHAIN_SEGMENT, SPAN_PROCESS_RPC_BLOBS, SPAN_PROCESS_RPC_BLOCK,
SPAN_PROCESS_RPC_CUSTODY_COLUMNS,
};
use std::sync::Arc;
use std::time::Duration;
use store::KzgCommitment;
use tracing::{Span, debug, error, info, instrument, warn};
use tracing::{debug, error, info, instrument, warn};
use types::beacon_block_body::format_kzg_commitments;
use types::blob_sidecar::FixedBlobSidecarList;
use types::{BlockImportSource, DataColumnSidecarList, Epoch, Hash256};
@@ -97,7 +101,13 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
/// Attempt to process a block received from a direct RPC request.
#[allow(clippy::too_many_arguments)]
#[instrument(skip_all, fields(?block_root), parent = None)]
#[instrument(
name = SPAN_PROCESS_RPC_BLOCK,
parent = None,
level = "debug",
skip_all,
fields(?block_root),
)]
pub async fn process_rpc_block(
self: Arc<NetworkBeaconProcessor<T>>,
block_root: Hash256,
@@ -244,7 +254,13 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
}
/// Attempt to process a list of blobs received from a direct RPC request.
#[instrument(skip_all, fields(?block_root, outcome = tracing::field::Empty), parent = None)]
#[instrument(
name = SPAN_PROCESS_RPC_BLOBS,
parent = None,
level = "debug",
skip_all,
fields(?block_root),
)]
pub async fn process_rpc_blobs(
self: Arc<NetworkBeaconProcessor<T>>,
block_root: Hash256,
@@ -293,7 +309,6 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
match &result {
Ok(AvailabilityProcessingStatus::Imported(hash)) => {
Span::current().record("outcome", "imported");
debug!(
result = "imported block and blobs",
%slot,
@@ -303,7 +318,6 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
self.chain.recompute_head_at_current_slot().await;
}
Ok(AvailabilityProcessingStatus::MissingComponents(_, _)) => {
Span::current().record("outcome", "missing_components");
debug!(
block_hash = %block_root,
%slot,
@@ -334,7 +348,13 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
});
}
#[instrument(skip_all, fields(?block_root), parent = None)]
#[instrument(
name = SPAN_PROCESS_RPC_CUSTODY_COLUMNS,
parent = None,
level = "debug",
skip_all,
fields(?block_root),
)]
pub async fn process_rpc_custody_columns(
self: Arc<NetworkBeaconProcessor<T>>,
block_root: Hash256,
@@ -420,7 +440,13 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
/// Attempt to import the chain segment (`blocks`) to the beacon chain, informing the sync
/// thread if more blocks are needed to process it.
#[instrument(skip_all, fields(sync_type = ?sync_type, downloaded_blocks = downloaded_blocks.len(), imported_blocks = tracing::field::Empty), parent = None)]
#[instrument(
name = SPAN_PROCESS_CHAIN_SEGMENT,
parent = None,
level = "debug",
skip_all,
fields(sync_type = ?sync_type, downloaded_blocks = downloaded_blocks.len())
)]
pub async fn process_chain_segment(
&self,
sync_type: ChainSegmentProcessId,
@@ -439,7 +465,6 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
.await
{
(imported_blocks, Ok(_)) => {
Span::current().record("imported_blocks", imported_blocks);
debug!(
batch_epoch = %epoch,
first_block_slot = start_slot,
@@ -454,7 +479,6 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
}
}
(imported_blocks, Err(e)) => {
Span::current().record("imported_blocks", imported_blocks);
debug!(
batch_epoch = %epoch,
first_block_slot = start_slot,
@@ -490,7 +514,6 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
match self.process_backfill_blocks(downloaded_blocks) {
(imported_blocks, Ok(_)) => {
Span::current().record("imported_blocks", imported_blocks);
debug!(
batch_epoch = %epoch,
first_block_slot = start_slot,
@@ -532,7 +555,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
}
/// Helper function to process blocks batches which only consumes the chain and blocks to process.
#[instrument(skip_all, fields(result = tracing::field::Empty))]
#[instrument(skip_all)]
async fn process_blocks<'a>(
&self,
downloaded_blocks: impl Iterator<Item = &'a RpcBlock<T::EthSpec>>,
@@ -545,7 +568,6 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
.await
{
ChainSegmentResult::Successful { imported_blocks } => {
Span::current().record("outcome", "success");
metrics::inc_counter(&metrics::BEACON_PROCESSOR_CHAIN_SEGMENT_SUCCESS_TOTAL);
if !imported_blocks.is_empty() {
self.chain.recompute_head_at_current_slot().await;
@@ -556,7 +578,6 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
imported_blocks,
error,
} => {
Span::current().record("outcome", "failed");
metrics::inc_counter(&metrics::BEACON_PROCESSOR_CHAIN_SEGMENT_FAILED_TOTAL);
let r = self.handle_failed_chain_segment(error);
if !imported_blocks.is_empty() {
@@ -568,6 +589,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
}
/// Helper function to process backfill block batches which only consumes the chain and blocks to process.
#[instrument(skip_all)]
fn process_backfill_blocks(
&self,
downloaded_blocks: Vec<RpcBlock<T::EthSpec>>,

View File

@@ -8,6 +8,7 @@ use lighthouse_network::{
},
};
use std::{collections::HashMap, sync::Arc};
use tracing::Span;
use types::{
BlobSidecar, ChainSpec, ColumnIndex, DataColumnSidecar, DataColumnSidecarList, EthSpec,
Hash256, RuntimeVariableList, SignedBeaconBlock,
@@ -31,6 +32,8 @@ pub struct RangeBlockComponentsRequest<E: EthSpec> {
blocks_request: ByRangeRequest<BlocksByRangeRequestId, Vec<Arc<SignedBeaconBlock<E>>>>,
/// Sidecars we have received awaiting for their corresponding block.
block_data_request: RangeBlockDataRequest<E>,
/// Span to track the range request and all children range requests.
pub(crate) request_span: Span,
}
enum ByRangeRequest<I: PartialEq + std::fmt::Display, T> {
@@ -81,6 +84,7 @@ impl<E: EthSpec> RangeBlockComponentsRequest<E> {
Vec<(DataColumnsByRangeRequestId, Vec<ColumnIndex>)>,
Vec<ColumnIndex>,
)>,
request_span: Span,
) -> Self {
let block_data_request = if let Some(blobs_req_id) = blobs_req_id {
RangeBlockDataRequest::Blobs(ByRangeRequest::Active(blobs_req_id))
@@ -102,6 +106,7 @@ impl<E: EthSpec> RangeBlockComponentsRequest<E> {
Self {
blocks_request: ByRangeRequest::Active(blocks_req_id),
block_data_request,
request_span,
}
}
@@ -471,6 +476,7 @@ mod tests {
};
use rand::SeedableRng;
use std::sync::Arc;
use tracing::Span;
use types::{Epoch, ForkName, MinimalEthSpec as E, SignedBeaconBlock, test_utils::XorShiftRng};
fn components_id() -> ComponentsByRangeRequestId {
@@ -526,7 +532,8 @@ mod tests {
.collect::<Vec<Arc<SignedBeaconBlock<E>>>>();
let blocks_req_id = blocks_id(components_id());
let mut info = RangeBlockComponentsRequest::<E>::new(blocks_req_id, None, None);
let mut info =
RangeBlockComponentsRequest::<E>::new(blocks_req_id, None, None, Span::none());
// Send blocks and complete terminate response
info.add_blocks(blocks_req_id, blocks).unwrap();
@@ -556,8 +563,12 @@ mod tests {
let components_id = components_id();
let blocks_req_id = blocks_id(components_id);
let blobs_req_id = blobs_id(components_id);
let mut info =
RangeBlockComponentsRequest::<E>::new(blocks_req_id, Some(blobs_req_id), None);
let mut info = RangeBlockComponentsRequest::<E>::new(
blocks_req_id,
Some(blobs_req_id),
None,
Span::none(),
);
// Send blocks and complete terminate response
info.add_blocks(blocks_req_id, blocks).unwrap();
@@ -597,6 +608,7 @@ mod tests {
blocks_req_id,
None,
Some((columns_req_id.clone(), expects_custody_columns.clone())),
Span::none(),
);
// Send blocks and complete terminate response
info.add_blocks(
@@ -656,6 +668,7 @@ mod tests {
blocks_req_id,
None,
Some((columns_req_id.clone(), expects_custody_columns.clone())),
Span::none(),
);
let mut rng = XorShiftRng::from_seed([42; 16]);
@@ -735,6 +748,7 @@ mod tests {
blocks_req_id,
None,
Some((columns_req_id.clone(), expected_custody_columns.clone())),
Span::none(),
);
// AND: All blocks are received successfully
@@ -814,6 +828,7 @@ mod tests {
blocks_req_id,
None,
Some((columns_req_id.clone(), expected_custody_columns.clone())),
Span::none(),
);
// AND: All blocks are received
@@ -895,6 +910,7 @@ mod tests {
blocks_req_id,
None,
Some((columns_req_id.clone(), expected_custody_columns.clone())),
Span::none(),
);
// AND: All blocks are received

View File

@@ -29,6 +29,7 @@ use lighthouse_network::service::api_types::{
DataColumnsByRootRequester, Id, SingleLookupReqId, SyncRequestId,
};
use lighthouse_network::{Client, NetworkGlobals, PeerAction, PeerId, ReportSource};
use lighthouse_tracing::SPAN_OUTGOING_RANGE_REQUEST;
use parking_lot::RwLock;
pub use requests::LookupVerifyError;
use requests::{
@@ -45,7 +46,7 @@ use std::time::Duration;
#[cfg(test)]
use task_executor::TaskExecutor;
use tokio::sync::mpsc;
use tracing::{debug, error, warn};
use tracing::{Span, debug, debug_span, error, warn};
use types::blob_sidecar::FixedBlobSidecarList;
use types::{
BlobSidecar, ColumnIndex, DataColumnSidecar, DataColumnSidecarList, EthSpec, ForkContext,
@@ -55,6 +56,18 @@ use types::{
pub mod custody;
mod requests;
macro_rules! new_range_request_span {
($self:expr, $name:literal, $parent:expr, $peer_id:expr) => {{
let client = $self.client_type(&$peer_id).kind;
debug_span!(
parent: $parent,
$name,
peer_id = %$peer_id,
client = %client
)
}};
}
/// Max retries for block components after which we fail the batch.
pub const MAX_COLUMN_RETRIES: usize = 3;
@@ -444,10 +457,16 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
request: BlocksByRangeRequest,
failed_columns: &HashSet<ColumnIndex>,
) -> Result<(), String> {
let Some(requester) = self
let Some((requester, parent_request_span)) = self
.components_by_range_requests
.keys()
.find_map(|r| if r.id == id { Some(r.requester) } else { None })
.iter()
.find_map(|(key, value)| {
if key.id == id {
Some((key.requester, value.request_span.clone()))
} else {
None
}
})
else {
return Err("request id not present".to_string());
};
@@ -485,6 +504,12 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
columns,
},
id,
new_range_request_span!(
self,
"outgoing_columns_by_range_retry",
parent_request_span.clone(),
peer_id
),
)
})
.collect::<Result<Vec<_>, _>>()
@@ -511,6 +536,13 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
peers: &HashSet<PeerId>,
peers_to_deprioritize: &HashSet<PeerId>,
) -> Result<Id, RpcRequestSendError> {
let range_request_span = debug_span!(
parent: None,
SPAN_OUTGOING_RANGE_REQUEST,
range_req_id = %requester,
peers = peers.len()
);
let _guard = range_request_span.clone().entered();
let active_request_count_by_peer = self.active_request_count_by_peer();
let Some(block_peer) = peers
@@ -561,7 +593,17 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
requester,
};
let blocks_req_id = self.send_blocks_by_range_request(block_peer, request.clone(), id)?;
let blocks_req_id = self.send_blocks_by_range_request(
block_peer,
request.clone(),
id,
new_range_request_span!(
self,
"outgoing_blocks_by_range",
range_request_span.clone(),
block_peer
),
)?;
let blobs_req_id = if matches!(batch_type, ByRangeRequestType::BlocksAndBlobs) {
Some(self.send_blobs_by_range_request(
@@ -571,6 +613,12 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
count: *request.count(),
},
id,
new_range_request_span!(
self,
"outgoing_blobs_by_range",
range_request_span.clone(),
block_peer
),
)?)
} else {
None
@@ -589,6 +637,12 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
columns,
},
id,
new_range_request_span!(
self,
"outgoing_columns_by_range",
range_request_span.clone(),
peer_id
),
)
})
.collect::<Result<Vec<_>, _>>()
@@ -605,6 +659,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
self.chain.sampling_columns_for_epoch(epoch).to_vec(),
)
}),
range_request_span,
);
self.components_by_range_requests.insert(id, info);
@@ -833,6 +888,8 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
// block and the peer must have it.
true,
BlocksByRootRequestItems::new(request),
// Not implemented
Span::none(),
);
Ok(LookupRequestResult::RequestSent(id.req_id))
@@ -927,6 +984,8 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
// have imported the block+blobs.
true,
BlobsByRootRequestItems::new(request),
// Not implemented
Span::none(),
);
Ok(LookupRequestResult::RequestSent(id.req_id))
@@ -970,6 +1029,9 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
peer_id,
expect_max_responses,
DataColumnsByRootRequestItems::new(request),
// Span is tracked in `self.custody_columns_by_root_requests` in the
// `ActiveCustodyRequest` struct.
Span::none(),
);
Ok(LookupRequestResult::RequestSent(id))
@@ -1065,6 +1127,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
peer_id: PeerId,
request: BlocksByRangeRequest,
parent_request_id: ComponentsByRangeRequestId,
request_span: Span,
) -> Result<BlocksByRangeRequestId, RpcRequestSendError> {
let id = BlocksByRangeRequestId {
id: self.next_id(),
@@ -1094,6 +1157,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
// know if there are missed blocks.
false,
BlocksByRangeRequestItems::new(request),
request_span,
);
Ok(id)
}
@@ -1103,6 +1167,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
peer_id: PeerId,
request: BlobsByRangeRequest,
parent_request_id: ComponentsByRangeRequestId,
request_span: Span,
) -> Result<BlobsByRangeRequestId, RpcRequestSendError> {
let id = BlobsByRangeRequestId {
id: self.next_id(),
@@ -1136,6 +1201,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
// know if there are missed blocks.
false,
BlobsByRangeRequestItems::new(request, max_blobs_per_block),
request_span,
);
Ok(id)
}
@@ -1145,6 +1211,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
peer_id: PeerId,
request: DataColumnsByRangeRequest,
parent_request_id: ComponentsByRangeRequestId,
request_span: Span,
) -> Result<(DataColumnsByRangeRequestId, Vec<u64>), RpcRequestSendError> {
let requested_columns = request.columns.clone();
let id = DataColumnsByRangeRequestId {
@@ -1177,6 +1244,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
// know if there are missed blocks.
false,
DataColumnsByRangeRequestItems::new(request),
request_span,
);
Ok((id, requested_columns))
}

View File

@@ -6,13 +6,14 @@ use beacon_chain::validator_monitor::timestamp_now;
use fnv::FnvHashMap;
use lighthouse_network::PeerId;
use lighthouse_network::service::api_types::{CustodyId, DataColumnsByRootRequester};
use lighthouse_tracing::SPAN_OUTGOING_CUSTODY_REQUEST;
use lru_cache::LRUTimeCache;
use parking_lot::RwLock;
use rand::Rng;
use std::collections::HashSet;
use std::time::{Duration, Instant};
use std::{collections::HashMap, marker::PhantomData, sync::Arc};
use tracing::{debug, warn};
use tracing::{Span, debug, debug_span, field, warn};
use types::{DataColumnSidecar, Hash256, data_column_sidecar::ColumnIndex};
use types::{DataColumnSidecarList, EthSpec};
@@ -34,7 +35,8 @@ pub struct ActiveCustodyRequest<T: BeaconChainTypes> {
failed_peers: LRUTimeCache<PeerId>,
/// Set of peers that claim to have imported this block and their custody columns
lookup_peers: Arc<RwLock<HashSet<PeerId>>>,
/// Span for tracing the lifetime of this request.
span: Span,
_phantom: PhantomData<T>,
}
@@ -55,6 +57,8 @@ pub enum Error {
struct ActiveBatchColumnsRequest {
indices: Vec<ColumnIndex>,
/// Span for tracing the lifetime of this request.
span: Span,
}
pub type CustodyRequestResult<E> =
@@ -67,6 +71,7 @@ impl<T: BeaconChainTypes> ActiveCustodyRequest<T> {
column_indices: &[ColumnIndex],
lookup_peers: Arc<RwLock<HashSet<PeerId>>>,
) -> Self {
let span = debug_span!(parent: None, SPAN_OUTGOING_CUSTODY_REQUEST, %block_root);
Self {
block_root,
custody_id,
@@ -78,6 +83,7 @@ impl<T: BeaconChainTypes> ActiveCustodyRequest<T> {
active_batch_columns_requests: <_>::default(),
failed_peers: LRUTimeCache::new(Duration::from_secs(FAILED_PEERS_CACHE_EXPIRY_SECONDS)),
lookup_peers,
span,
_phantom: PhantomData,
}
}
@@ -106,6 +112,8 @@ impl<T: BeaconChainTypes> ActiveCustodyRequest<T> {
return Ok(None);
};
let _guard = batch_request.span.clone().entered();
match resp {
Ok((data_columns, seen_timestamp)) => {
debug!(
@@ -163,6 +171,11 @@ impl<T: BeaconChainTypes> ActiveCustodyRequest<T> {
"Custody column peer claims to not have some data"
);
batch_request.span.record(
"missing_column_indexes",
field::debug(missing_column_indexes),
);
self.failed_peers.insert(peer_id);
}
}
@@ -183,6 +196,11 @@ impl<T: BeaconChainTypes> ActiveCustodyRequest<T> {
.on_download_error_and_mark_failure(req_id)?;
}
batch_request.span.record(
"missing_column_indexes",
field::debug(&batch_request.indices),
);
self.failed_peers.insert(peer_id);
}
};
@@ -194,6 +212,7 @@ impl<T: BeaconChainTypes> ActiveCustodyRequest<T> {
&mut self,
cx: &mut SyncNetworkContext<T>,
) -> CustodyRequestResult<T::EthSpec> {
let _guard = self.span.clone().entered();
if self.column_requests.values().all(|r| r.is_downloaded()) {
// All requests have completed successfully.
let mut peers = HashMap::<PeerId, Vec<usize>>::new();
@@ -298,6 +317,9 @@ impl<T: BeaconChainTypes> ActiveCustodyRequest<T> {
match request_result {
LookupRequestResult::RequestSent(req_id) => {
let client = cx.network_globals().client(&peer_id).kind;
let batch_columns_req_span = debug_span!("batch_columns_req", %peer_id, %client, missing_column_indexes = tracing::field::Empty);
let _guard = batch_columns_req_span.clone().entered();
for column_index in &indices {
let column_request = self
.column_requests
@@ -308,8 +330,13 @@ impl<T: BeaconChainTypes> ActiveCustodyRequest<T> {
column_request.on_download_start(req_id)?;
}
self.active_batch_columns_requests
.insert(req_id, ActiveBatchColumnsRequest { indices });
self.active_batch_columns_requests.insert(
req_id,
ActiveBatchColumnsRequest {
indices,
span: batch_columns_req_span,
},
);
}
LookupRequestResult::NoRequestNeeded(_) => unreachable!(),
LookupRequestResult::Pending(_) => unreachable!(),

View File

@@ -4,6 +4,7 @@ use beacon_chain::validator_monitor::timestamp_now;
use fnv::FnvHashMap;
use lighthouse_network::PeerId;
use strum::IntoStaticStr;
use tracing::Span;
use types::{Hash256, Slot};
pub use blobs_by_range::BlobsByRangeRequestItems;
@@ -50,6 +51,7 @@ struct ActiveRequest<T: ActiveRequestItems> {
peer_id: PeerId,
// Error if the request terminates before receiving max expected responses
expect_max_responses: bool,
span: Span,
}
enum State<T> {
@@ -66,13 +68,22 @@ impl<K: Eq + Hash, T: ActiveRequestItems> ActiveRequests<K, T> {
}
}
pub fn insert(&mut self, id: K, peer_id: PeerId, expect_max_responses: bool, items: T) {
pub fn insert(
&mut self,
id: K,
peer_id: PeerId,
expect_max_responses: bool,
items: T,
span: Span,
) {
let _guard = span.clone().entered();
self.requests.insert(
id,
ActiveRequest {
state: State::Active(items),
peer_id,
expect_max_responses,
span,
},
);
}
@@ -106,6 +117,7 @@ impl<K: Eq + Hash, T: ActiveRequestItems> ActiveRequests<K, T> {
// `ActiveRequestItems` validates the item before appending to its internal state.
RpcEvent::Response(item, seen_timestamp) => {
let request = &mut entry.get_mut();
let _guard = request.span.clone().entered();
match &mut request.state {
State::Active(items) => {
match items.add(item) {
@@ -141,6 +153,7 @@ impl<K: Eq + Hash, T: ActiveRequestItems> ActiveRequests<K, T> {
// After stream termination we must forget about this request, there will be no more
// messages coming from the network
let request = entry.remove();
let _guard = request.span.clone().entered();
match request.state {
// Received a stream termination in a valid sequence, consume items
State::Active(mut items) => {
@@ -162,7 +175,9 @@ impl<K: Eq + Hash, T: ActiveRequestItems> ActiveRequests<K, T> {
RpcEvent::RPCError(e) => {
// After an Error event from the network we must forget about this request as this
// may be the last message for this request.
match entry.remove().state {
let request = entry.remove();
let _guard = request.span.clone().entered();
match request.state {
// Received error while request is still active, propagate error.
State::Active(_) => Some(Err(e.into())),
// Received error after completing the request, ignore the error. This is okay

View File

@@ -9,10 +9,11 @@ use beacon_chain::BeaconChainTypes;
use beacon_chain::block_verification_types::RpcBlock;
use lighthouse_network::service::api_types::Id;
use lighthouse_network::{PeerAction, PeerId};
use lighthouse_tracing::SPAN_SYNCING_CHAIN;
use logging::crit;
use std::collections::{BTreeMap, HashSet, btree_map::Entry};
use strum::IntoStaticStr;
use tracing::{debug, warn};
use tracing::{Span, debug, instrument, warn};
use types::{ColumnIndex, Epoch, EthSpec, Hash256, Slot};
/// Blocks are downloaded in batches from peers. This constant specifies how many epochs worth of
@@ -111,6 +112,9 @@ pub struct SyncingChain<T: BeaconChainTypes> {
/// The current processing batch, if any.
current_processing_batch: Option<BatchId>,
/// The span to track the lifecycle of the syncing chain.
span: Span,
}
#[derive(PartialEq, Debug)]
@@ -123,6 +127,13 @@ pub enum ChainSyncingState {
impl<T: BeaconChainTypes> SyncingChain<T> {
#[allow(clippy::too_many_arguments)]
#[instrument(
name = SPAN_SYNCING_CHAIN,
parent = None,
level="debug",
skip(id),
fields(chain_id = %id)
)]
pub fn new(
id: Id,
start_epoch: Epoch,
@@ -131,6 +142,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
peer_id: PeerId,
chain_type: SyncingChainType,
) -> Self {
let span = Span::current();
SyncingChain {
id,
chain_type,
@@ -145,6 +157,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
attempted_optimistic_starts: HashSet::default(),
state: ChainSyncingState::Stopped,
current_processing_batch: None,
span,
}
}
@@ -186,6 +199,8 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
/// Removes a peer from the chain.
/// If the peer has active batches, those are considered failed and re-requested.
pub fn remove_peer(&mut self, peer_id: &PeerId) -> ProcessingResult {
let _guard = self.span.clone().entered();
debug!(peer = %peer_id, "Removing peer from chain");
self.peers.remove(peer_id);
if self.peers.is_empty() {
@@ -213,6 +228,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
request_id: Id,
blocks: Vec<RpcBlock<T::EthSpec>>,
) -> ProcessingResult {
let _guard = self.span.clone().entered();
// check if we have this batch
let batch = match self.batches.get_mut(&batch_id) {
None => {
@@ -242,7 +258,14 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
let awaiting_batches = batch_id
.saturating_sub(self.optimistic_start.unwrap_or(self.processing_target))
/ EPOCHS_PER_BATCH;
debug!(epoch = %batch_id, blocks = received, batch_state = self.visualize_batch_state(), %awaiting_batches,"Batch downloaded");
debug!(
epoch = %batch_id,
blocks = received,
batch_state = self.visualize_batch_state(),
%awaiting_batches,
%peer_id,
"Batch downloaded"
);
// pre-emptively request more blocks from peers whilst we process current blocks,
self.request_batches(network)?;
@@ -415,6 +438,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
batch_id: BatchId,
result: &BatchProcessResult,
) -> ProcessingResult {
let _guard = self.span.clone().entered();
// the first two cases are possible if the chain advances while waiting for a processing
// result
let batch_state = self.visualize_batch_state();
@@ -754,6 +778,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
}
pub fn stop_syncing(&mut self) {
debug!(parent: &self.span, "Stopping syncing");
self.state = ChainSyncingState::Stopped;
}
@@ -767,6 +792,12 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
local_finalized_epoch: Epoch,
optimistic_start_epoch: Epoch,
) -> ProcessingResult {
let _guard = self.span.clone().entered();
debug!(
?local_finalized_epoch,
?optimistic_start_epoch,
"Start syncing chain"
);
// to avoid dropping local progress, we advance the chain wrt its batch boundaries. This
let align = |epoch| {
// start_epoch + (number of batches in between)*length_of_batch
@@ -804,6 +835,8 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
network: &mut SyncNetworkContext<T>,
peer_id: PeerId,
) -> ProcessingResult {
let _guard = self.span.clone().entered();
debug!(peer_id = %peer_id, "Adding peer to chain");
self.peers.insert(peer_id);
self.request_batches(network)
}
@@ -819,6 +852,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
request_id: Id,
err: RpcResponseError,
) -> ProcessingResult {
let _guard = self.span.clone().entered();
let batch_state = self.visualize_batch_state();
if let Some(batch) = self.batches.get_mut(&batch_id) {
if let RpcResponseError::BlockComponentCouplingError(coupling_error) = &err {
@@ -911,6 +945,8 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
network: &mut SyncNetworkContext<T>,
batch_id: BatchId,
) -> ProcessingResult {
let _guard = self.span.clone().entered();
debug!(batch_epoch = %batch_id, "Requesting batch");
let batch_state = self.visualize_batch_state();
if let Some(batch) = self.batches.get_mut(&batch_id) {
let (request, batch_type) = batch.to_blocks_by_range_request();
@@ -981,7 +1017,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
}
/// Retries partial column requests within the batch by creating new requests for the failed columns.
pub fn retry_partial_batch(
fn retry_partial_batch(
&mut self,
network: &mut SyncNetworkContext<T>,
batch_id: BatchId,
@@ -989,6 +1025,8 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
failed_columns: HashSet<ColumnIndex>,
mut failed_peers: HashSet<PeerId>,
) -> ProcessingResult {
let _guard = self.span.clone().entered();
debug!(%batch_id, %id, ?failed_columns, "Retrying partial batch");
if let Some(batch) = self.batches.get_mut(&batch_id) {
failed_peers.extend(&batch.failed_peers());
let req = batch.to_blocks_by_range_request().0;
@@ -1037,6 +1075,8 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
&mut self,
network: &mut SyncNetworkContext<T>,
) -> Result<KeepChain, RemoveChain> {
let _guard = self.span.clone().entered();
debug!("Resuming chain");
// Request more batches if needed.
self.request_batches(network)?;
// If there is any batch ready for processing, send it.

View File

@@ -53,6 +53,7 @@ environment = { workspace = true }
eth2_network_config = { workspace = true }
ethereum_hashing = { workspace = true }
futures = { workspace = true }
lighthouse_tracing = { workspace = true }
lighthouse_version = { workspace = true }
logging = { workspace = true }
metrics = { workspace = true }