Add sync request metrics (#7790)

Add error rates metrics on unstable to benchmark against tree-sync. In my branch there are frequent errors but mostly connections errors as the node is still finding it set of stable peers.

These metrics are very useful and unstable can benefit from them ahead of tree-sync


  Add three new metrics:
- sync_rpc_requests_success_total: Total count of sync RPC requests successes
- sync_rpc_requests_error_total: Total count of sync RPC requests errors
- sync_rpc_request_duration_sec: Time to complete a successful sync RPC requesst


Co-Authored-By: dapplion <35266934+dapplion@users.noreply.github.com>
This commit is contained in:
Lion - dapplion
2026-02-10 16:40:01 -07:00
committed by GitHub
parent 889946c04b
commit 8d72cc34eb
3 changed files with 80 additions and 36 deletions

View File

@@ -507,6 +507,30 @@ pub static SYNC_UNKNOWN_NETWORK_REQUESTS: LazyLock<Result<IntCounterVec>> = Lazy
&["type"],
)
});
pub static SYNC_RPC_REQUEST_SUCCESSES: LazyLock<Result<IntCounterVec>> = LazyLock::new(|| {
try_create_int_counter_vec(
"sync_rpc_requests_success_total",
"Total count of sync RPC requests successes",
&["protocol"],
)
});
pub static SYNC_RPC_REQUEST_ERRORS: LazyLock<Result<IntCounterVec>> = LazyLock::new(|| {
try_create_int_counter_vec(
"sync_rpc_requests_error_total",
"Total count of sync RPC requests errors",
&["protocol", "error"],
)
});
pub static SYNC_RPC_REQUEST_TIME: LazyLock<Result<HistogramVec>> = LazyLock::new(|| {
try_create_histogram_vec_with_buckets(
"sync_rpc_request_duration_sec",
"Time to complete a successful sync RPC requesst",
Ok(vec![
0.001, 0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.25, 0.5, 1.0, 2.0,
]),
&["protocol"],
)
});
/*
* Block Delay Metrics

View File

@@ -1430,7 +1430,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
}
})
});
self.on_rpc_response_result(id, "BlocksByRoot", resp, peer_id, |_| 1)
self.on_rpc_response_result(resp, peer_id)
}
pub(crate) fn on_single_blob_response(
@@ -1459,7 +1459,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
}
})
});
self.on_rpc_response_result(id, "BlobsByRoot", resp, peer_id, |_| 1)
self.on_rpc_response_result(resp, peer_id)
}
#[allow(clippy::type_complexity)]
@@ -1472,7 +1472,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
let resp = self
.data_columns_by_root_requests
.on_response(id, rpc_event);
self.on_rpc_response_result(id, "DataColumnsByRoot", resp, peer_id, |_| 1)
self.on_rpc_response_result(resp, peer_id)
}
#[allow(clippy::type_complexity)]
@@ -1483,7 +1483,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
rpc_event: RpcEvent<Arc<SignedBeaconBlock<T::EthSpec>>>,
) -> Option<RpcResponseResult<Vec<Arc<SignedBeaconBlock<T::EthSpec>>>>> {
let resp = self.blocks_by_range_requests.on_response(id, rpc_event);
self.on_rpc_response_result(id, "BlocksByRange", resp, peer_id, |b| b.len())
self.on_rpc_response_result(resp, peer_id)
}
#[allow(clippy::type_complexity)]
@@ -1494,7 +1494,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
rpc_event: RpcEvent<Arc<BlobSidecar<T::EthSpec>>>,
) -> Option<RpcResponseResult<Vec<Arc<BlobSidecar<T::EthSpec>>>>> {
let resp = self.blobs_by_range_requests.on_response(id, rpc_event);
self.on_rpc_response_result(id, "BlobsByRangeRequest", resp, peer_id, |b| b.len())
self.on_rpc_response_result(resp, peer_id)
}
#[allow(clippy::type_complexity)]
@@ -1507,36 +1507,15 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
let resp = self
.data_columns_by_range_requests
.on_response(id, rpc_event);
self.on_rpc_response_result(id, "DataColumnsByRange", resp, peer_id, |d| d.len())
self.on_rpc_response_result(resp, peer_id)
}
fn on_rpc_response_result<I: std::fmt::Display, R, F: FnOnce(&R) -> usize>(
/// Common handler for consistent scoring of RpcResponseError
fn on_rpc_response_result<R>(
&mut self,
id: I,
method: &'static str,
resp: Option<RpcResponseResult<R>>,
peer_id: PeerId,
get_count: F,
) -> Option<RpcResponseResult<R>> {
match &resp {
None => {}
Some(Ok((v, _))) => {
debug!(
%id,
method,
count = get_count(v),
"Sync RPC request completed"
);
}
Some(Err(e)) => {
debug!(
%id,
method,
error = ?e,
"Sync RPC request error"
);
}
}
if let Some(Err(RpcResponseError::VerifyError(e))) = &resp {
self.report_peer(peer_id, PeerAction::LowToleranceError, e.into());
}

View File

@@ -1,10 +1,11 @@
use std::time::Instant;
use std::{collections::hash_map::Entry, hash::Hash};
use beacon_chain::validator_monitor::timestamp_now;
use fnv::FnvHashMap;
use lighthouse_network::PeerId;
use strum::IntoStaticStr;
use tracing::Span;
use tracing::{Span, debug};
use types::{Hash256, Slot};
pub use blobs_by_range::BlobsByRangeRequestItems;
@@ -18,7 +19,7 @@ pub use data_columns_by_root::{
use crate::metrics;
use super::{RpcEvent, RpcResponseResult};
use super::{RpcEvent, RpcResponseError, RpcResponseResult};
mod blobs_by_range;
mod blobs_by_root;
@@ -51,6 +52,7 @@ struct ActiveRequest<T: ActiveRequestItems> {
peer_id: PeerId,
// Error if the request terminates before receiving max expected responses
expect_max_responses: bool,
start_instant: Instant,
span: Span,
}
@@ -60,7 +62,7 @@ enum State<T> {
Errored,
}
impl<K: Eq + Hash, T: ActiveRequestItems> ActiveRequests<K, T> {
impl<K: Copy + Eq + Hash + std::fmt::Display, T: ActiveRequestItems> ActiveRequests<K, T> {
pub fn new(name: &'static str) -> Self {
Self {
requests: <_>::default(),
@@ -83,6 +85,7 @@ impl<K: Eq + Hash, T: ActiveRequestItems> ActiveRequests<K, T> {
state: State::Active(items),
peer_id,
expect_max_responses,
start_instant: Instant::now(),
span,
},
);
@@ -112,7 +115,7 @@ impl<K: Eq + Hash, T: ActiveRequestItems> ActiveRequests<K, T> {
return None;
};
match rpc_event {
let result = match rpc_event {
// Handler of a success ReqResp chunk. Adds the item to the request accumulator.
// `ActiveRequestItems` validates the item before appending to its internal state.
RpcEvent::Response(item, seen_timestamp) => {
@@ -126,7 +129,7 @@ impl<K: Eq + Hash, T: ActiveRequestItems> ActiveRequests<K, T> {
Ok(true) => {
let items = items.consume();
request.state = State::CompletedEarly;
Some(Ok((items, seen_timestamp)))
Some(Ok((items, seen_timestamp, request.start_instant.elapsed())))
}
// Received item, but we are still expecting more
Ok(false) => None,
@@ -163,7 +166,11 @@ impl<K: Eq + Hash, T: ActiveRequestItems> ActiveRequests<K, T> {
}
.into()))
} else {
Some(Ok((items.consume(), timestamp_now())))
Some(Ok((
items.consume(),
timestamp_now(),
request.start_instant.elapsed(),
)))
}
}
// Items already returned, ignore stream termination
@@ -188,7 +195,41 @@ impl<K: Eq + Hash, T: ActiveRequestItems> ActiveRequests<K, T> {
State::Errored => None,
}
}
}
};
result.map(|result| match result {
Ok((items, seen_timestamp, duration)) => {
metrics::inc_counter_vec(&metrics::SYNC_RPC_REQUEST_SUCCESSES, &[self.name]);
metrics::observe_timer_vec(&metrics::SYNC_RPC_REQUEST_TIME, &[self.name], duration);
debug!(
%id,
method = self.name,
count = items.len(),
"Sync RPC request completed"
);
Ok((items, seen_timestamp))
}
Err(e) => {
let err_str: &'static str = match &e {
RpcResponseError::RpcError(e) => e.into(),
RpcResponseError::VerifyError(e) => e.into(),
RpcResponseError::CustodyRequestError(_) => "CustodyRequestError",
RpcResponseError::BlockComponentCouplingError(_) => {
"BlockComponentCouplingError"
}
};
metrics::inc_counter_vec(&metrics::SYNC_RPC_REQUEST_ERRORS, &[self.name, err_str]);
debug!(
%id,
method = self.name,
error = ?e,
"Sync RPC request error"
);
Err(e)
}
})
}
pub fn active_requests_of_peer(&self, peer_id: &PeerId) -> Vec<&K> {