Add sync batch state metrics (#8847)

Co-Authored-By: Jimmy Chen <jchen.tc@gmail.com>
This commit is contained in:
Jimmy Chen
2026-02-19 12:57:53 +11:00
committed by GitHub
parent 561898fc1c
commit 4588971085
8 changed files with 108 additions and 5 deletions

View File

@@ -462,6 +462,13 @@ pub static SYNCING_CHAIN_BATCH_AWAITING_PROCESSING: LazyLock<Result<Histogram>>
]), ]),
) )
}); });
pub static SYNCING_CHAIN_BATCHES: LazyLock<Result<IntGaugeVec>> = LazyLock::new(|| {
try_create_int_gauge_vec(
"sync_batches",
"Number of batches in sync chains by sync type and state",
&["sync_type", "state"],
)
});
pub static SYNC_SINGLE_BLOCK_LOOKUPS: LazyLock<Result<IntGauge>> = LazyLock::new(|| { pub static SYNC_SINGLE_BLOCK_LOOKUPS: LazyLock<Result<IntGauge>> = LazyLock::new(|| {
try_create_int_gauge( try_create_int_gauge(
"sync_single_block_lookups", "sync_single_block_lookups",

View File

@@ -8,9 +8,11 @@
//! If a batch fails, the backfill sync cannot progress. In this scenario, we mark the backfill //! If a batch fails, the backfill sync cannot progress. In this scenario, we mark the backfill
//! sync as failed, log an error and attempt to retry once a new peer joins the node. //! sync as failed, log an error and attempt to retry once a new peer joins the node.
use crate::metrics;
use crate::network_beacon_processor::ChainSegmentProcessId; use crate::network_beacon_processor::ChainSegmentProcessId;
use crate::sync::batch::{ use crate::sync::batch::{
BatchConfig, BatchId, BatchInfo, BatchOperationOutcome, BatchProcessingResult, BatchState, BatchConfig, BatchId, BatchInfo, BatchMetricsState, BatchOperationOutcome,
BatchProcessingResult, BatchState,
}; };
use crate::sync::block_sidecar_coupling::CouplingError; use crate::sync::block_sidecar_coupling::CouplingError;
use crate::sync::manager::BatchProcessResult; use crate::sync::manager::BatchProcessResult;
@@ -31,6 +33,7 @@ use std::collections::{
use std::hash::{Hash, Hasher}; use std::hash::{Hash, Hasher};
use std::marker::PhantomData; use std::marker::PhantomData;
use std::sync::Arc; use std::sync::Arc;
use strum::IntoEnumIterator;
use tracing::{debug, error, info, warn}; use tracing::{debug, error, info, warn};
use types::{ColumnIndex, Epoch, EthSpec}; use types::{ColumnIndex, Epoch, EthSpec};
@@ -1181,6 +1184,21 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
.epoch(T::EthSpec::slots_per_epoch()) .epoch(T::EthSpec::slots_per_epoch())
} }
pub fn register_metrics(&self) {
for state in BatchMetricsState::iter() {
let count = self
.batches
.values()
.filter(|b| b.state().metrics_state() == state)
.count();
metrics::set_gauge_vec(
&metrics::SYNCING_CHAIN_BATCHES,
&["backfill", state.into()],
count as i64,
);
}
}
/// Updates the global network state indicating the current state of a backfill sync. /// Updates the global network state indicating the current state of a backfill sync.
fn set_state(&self, state: BackFillState) { fn set_state(&self, state: BackFillState) {
*self.network_globals.backfill_state.write() = state; *self.network_globals.backfill_state.write() = state;

View File

@@ -10,10 +10,22 @@ use std::marker::PhantomData;
use std::ops::Sub; use std::ops::Sub;
use std::time::Duration; use std::time::Duration;
use std::time::Instant; use std::time::Instant;
use strum::Display; use strum::{Display, EnumIter, IntoStaticStr};
use types::Slot; use types::Slot;
use types::{DataColumnSidecarList, Epoch, EthSpec}; use types::{DataColumnSidecarList, Epoch, EthSpec};
/// Batch states used as metrics labels.
#[derive(Debug, Clone, Copy, PartialEq, Eq, EnumIter, IntoStaticStr)]
#[strum(serialize_all = "snake_case")]
pub enum BatchMetricsState {
AwaitingDownload,
Downloading,
AwaitingProcessing,
Processing,
AwaitingValidation,
Failed,
}
pub type BatchId = Epoch; pub type BatchId = Epoch;
/// Type of expected batch. /// Type of expected batch.
@@ -142,6 +154,18 @@ impl<D: Hash> BatchState<D> {
pub fn poison(&mut self) -> BatchState<D> { pub fn poison(&mut self) -> BatchState<D> {
std::mem::replace(self, BatchState::Poisoned) std::mem::replace(self, BatchState::Poisoned)
} }
/// Returns the metrics state for this batch.
pub fn metrics_state(&self) -> BatchMetricsState {
match self {
BatchState::AwaitingDownload => BatchMetricsState::AwaitingDownload,
BatchState::Downloading(_) => BatchMetricsState::Downloading,
BatchState::AwaitingProcessing(..) => BatchMetricsState::AwaitingProcessing,
BatchState::Processing(_) => BatchMetricsState::Processing,
BatchState::AwaitingValidation(_) => BatchMetricsState::AwaitingValidation,
BatchState::Poisoned | BatchState::Failed => BatchMetricsState::Failed,
}
}
} }
impl<E: EthSpec, B: BatchConfig, D: Hash> BatchInfo<E, B, D> { impl<E: EthSpec, B: BatchConfig, D: Hash> BatchInfo<E, B, D> {

View File

@@ -12,14 +12,16 @@ use lighthouse_network::{
}; };
use logging::crit; use logging::crit;
use std::hash::{DefaultHasher, Hash, Hasher}; use std::hash::{DefaultHasher, Hash, Hasher};
use strum::IntoEnumIterator;
use tracing::{debug, error, info, info_span, warn}; use tracing::{debug, error, info, info_span, warn};
use types::{DataColumnSidecarList, Epoch, EthSpec}; use types::{DataColumnSidecarList, Epoch, EthSpec};
use crate::metrics;
use crate::sync::{ use crate::sync::{
backfill_sync::{BACKFILL_EPOCHS_PER_BATCH, ProcessResult, SyncStart}, backfill_sync::{BACKFILL_EPOCHS_PER_BATCH, ProcessResult, SyncStart},
batch::{ batch::{
BatchConfig, BatchId, BatchInfo, BatchOperationOutcome, BatchProcessingResult, BatchState, BatchConfig, BatchId, BatchInfo, BatchMetricsState, BatchOperationOutcome,
ByRangeRequestType, BatchProcessingResult, BatchState, ByRangeRequestType,
}, },
block_sidecar_coupling::CouplingError, block_sidecar_coupling::CouplingError,
manager::CustodyBatchProcessResult, manager::CustodyBatchProcessResult,
@@ -1114,6 +1116,21 @@ impl<T: BeaconChainTypes> CustodyBackFillSync<T> {
*self.network_globals.custody_sync_state.write() = state; *self.network_globals.custody_sync_state.write() = state;
} }
pub fn register_metrics(&self) {
for state in BatchMetricsState::iter() {
let count = self
.batches
.values()
.filter(|b| b.state().metrics_state() == state)
.count();
metrics::set_gauge_vec(
&metrics::SYNCING_CHAIN_BATCHES,
&["custody_backfill", state.into()],
count as i64,
);
}
}
/// A fully synced peer has joined us. /// A fully synced peer has joined us.
/// If we are in a failed state, update a local variable to indicate we are able to restart /// If we are in a failed state, update a local variable to indicate we are able to restart
/// the failed sync on the next attempt. /// the failed sync on the next attempt.

View File

@@ -784,6 +784,9 @@ impl<T: BeaconChainTypes> SyncManager<T> {
} }
_ = register_metrics_interval.tick() => { _ = register_metrics_interval.tick() => {
self.network.register_metrics(); self.network.register_metrics();
self.range_sync.register_metrics();
self.backfill_sync.register_metrics();
self.custody_backfill_sync.register_metrics();
} }
_ = epoch_interval.tick() => { _ = epoch_interval.tick() => {
self.update_sync_state(); self.update_sync_state();

View File

@@ -3,7 +3,8 @@ use crate::metrics;
use crate::network_beacon_processor::ChainSegmentProcessId; use crate::network_beacon_processor::ChainSegmentProcessId;
use crate::sync::batch::BatchId; use crate::sync::batch::BatchId;
use crate::sync::batch::{ use crate::sync::batch::{
BatchConfig, BatchInfo, BatchOperationOutcome, BatchProcessingResult, BatchState, BatchConfig, BatchInfo, BatchMetricsState, BatchOperationOutcome, BatchProcessingResult,
BatchState,
}; };
use crate::sync::block_sidecar_coupling::CouplingError; use crate::sync::block_sidecar_coupling::CouplingError;
use crate::sync::network_context::{RangeRequestId, RpcRequestSendError, RpcResponseError}; use crate::sync::network_context::{RangeRequestId, RpcRequestSendError, RpcResponseError};
@@ -234,6 +235,14 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
.sum() .sum()
} }
/// Returns the number of batches in the given metrics state.
pub fn count_batches_in_state(&self, state: BatchMetricsState) -> usize {
self.batches
.values()
.filter(|b| b.state().metrics_state() == state)
.count()
}
/// Removes a peer from the chain. /// Removes a peer from the chain.
/// If the peer has active batches, those are considered failed and re-requested. /// If the peer has active batches, those are considered failed and re-requested.
pub fn remove_peer(&mut self, peer_id: &PeerId) -> ProcessingResult { pub fn remove_peer(&mut self, peer_id: &PeerId) -> ProcessingResult {

View File

@@ -6,6 +6,7 @@
use super::chain::{ChainId, ProcessingResult, RemoveChain, SyncingChain}; use super::chain::{ChainId, ProcessingResult, RemoveChain, SyncingChain};
use super::sync_type::RangeSyncType; use super::sync_type::RangeSyncType;
use crate::metrics; use crate::metrics;
use crate::sync::batch::BatchMetricsState;
use crate::sync::network_context::SyncNetworkContext; use crate::sync::network_context::SyncNetworkContext;
use beacon_chain::{BeaconChain, BeaconChainTypes}; use beacon_chain::{BeaconChain, BeaconChainTypes};
use fnv::FnvHashMap; use fnv::FnvHashMap;
@@ -17,6 +18,7 @@ use smallvec::SmallVec;
use std::collections::HashMap; use std::collections::HashMap;
use std::collections::hash_map::Entry; use std::collections::hash_map::Entry;
use std::sync::Arc; use std::sync::Arc;
use strum::IntoEnumIterator;
use tracing::{debug, error}; use tracing::{debug, error};
use types::EthSpec; use types::EthSpec;
use types::{Epoch, Hash256, Slot}; use types::{Epoch, Hash256, Slot};
@@ -516,6 +518,25 @@ impl<T: BeaconChainTypes> ChainCollection<T> {
} }
} }
pub fn register_metrics(&self) {
for (sync_type, chains) in [
("range_finalized", &self.finalized_chains),
("range_head", &self.head_chains),
] {
for state in BatchMetricsState::iter() {
let count: usize = chains
.values()
.map(|chain| chain.count_batches_in_state(state))
.sum();
metrics::set_gauge_vec(
&metrics::SYNCING_CHAIN_BATCHES,
&[sync_type, state.into()],
count as i64,
);
}
}
}
fn update_metrics(&self) { fn update_metrics(&self) {
metrics::set_gauge_vec( metrics::set_gauge_vec(
&metrics::SYNCING_CHAINS_COUNT, &metrics::SYNCING_CHAINS_COUNT,

View File

@@ -371,6 +371,10 @@ where
.update(network, &local, &mut self.awaiting_head_peers); .update(network, &local, &mut self.awaiting_head_peers);
} }
pub fn register_metrics(&self) {
self.chains.register_metrics();
}
/// Kickstarts sync. /// Kickstarts sync.
pub fn resume(&mut self, network: &mut SyncNetworkContext<T>) { pub fn resume(&mut self, network: &mut SyncNetworkContext<T>) {
for (removed_chain, sync_type, remove_reason) in for (removed_chain, sync_type, remove_reason) in