From 8d72cc34ebe3c94c0856a0c9534943179e11b39f Mon Sep 17 00:00:00 2001 From: Lion - dapplion <35266934+dapplion@users.noreply.github.com> Date: Tue, 10 Feb 2026 16:40:01 -0700 Subject: [PATCH] Add sync request metrics (#7790) Add error rates metrics on unstable to benchmark against tree-sync. In my branch there are frequent errors but mostly connections errors as the node is still finding it set of stable peers. These metrics are very useful and unstable can benefit from them ahead of tree-sync Add three new metrics: - sync_rpc_requests_success_total: Total count of sync RPC requests successes - sync_rpc_requests_error_total: Total count of sync RPC requests errors - sync_rpc_request_duration_sec: Time to complete a successful sync RPC requesst Co-Authored-By: dapplion <35266934+dapplion@users.noreply.github.com> --- beacon_node/network/src/metrics.rs | 24 ++++++++ .../network/src/sync/network_context.rs | 37 +++---------- .../src/sync/network_context/requests.rs | 55 ++++++++++++++++--- 3 files changed, 80 insertions(+), 36 deletions(-) diff --git a/beacon_node/network/src/metrics.rs b/beacon_node/network/src/metrics.rs index cea06a28c8..0016f66c01 100644 --- a/beacon_node/network/src/metrics.rs +++ b/beacon_node/network/src/metrics.rs @@ -507,6 +507,30 @@ pub static SYNC_UNKNOWN_NETWORK_REQUESTS: LazyLock> = Lazy &["type"], ) }); +pub static SYNC_RPC_REQUEST_SUCCESSES: LazyLock> = LazyLock::new(|| { + try_create_int_counter_vec( + "sync_rpc_requests_success_total", + "Total count of sync RPC requests successes", + &["protocol"], + ) +}); +pub static SYNC_RPC_REQUEST_ERRORS: LazyLock> = LazyLock::new(|| { + try_create_int_counter_vec( + "sync_rpc_requests_error_total", + "Total count of sync RPC requests errors", + &["protocol", "error"], + ) +}); +pub static SYNC_RPC_REQUEST_TIME: LazyLock> = LazyLock::new(|| { + try_create_histogram_vec_with_buckets( + "sync_rpc_request_duration_sec", + "Time to complete a successful sync RPC requesst", + Ok(vec![ + 0.001, 0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.25, 0.5, 1.0, 2.0, + ]), + &["protocol"], + ) +}); /* * Block Delay Metrics diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index 7f4da9c0da..542625b8a3 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -1430,7 +1430,7 @@ impl SyncNetworkContext { } }) }); - self.on_rpc_response_result(id, "BlocksByRoot", resp, peer_id, |_| 1) + self.on_rpc_response_result(resp, peer_id) } pub(crate) fn on_single_blob_response( @@ -1459,7 +1459,7 @@ impl SyncNetworkContext { } }) }); - self.on_rpc_response_result(id, "BlobsByRoot", resp, peer_id, |_| 1) + self.on_rpc_response_result(resp, peer_id) } #[allow(clippy::type_complexity)] @@ -1472,7 +1472,7 @@ impl SyncNetworkContext { let resp = self .data_columns_by_root_requests .on_response(id, rpc_event); - self.on_rpc_response_result(id, "DataColumnsByRoot", resp, peer_id, |_| 1) + self.on_rpc_response_result(resp, peer_id) } #[allow(clippy::type_complexity)] @@ -1483,7 +1483,7 @@ impl SyncNetworkContext { rpc_event: RpcEvent>>, ) -> Option>>>> { let resp = self.blocks_by_range_requests.on_response(id, rpc_event); - self.on_rpc_response_result(id, "BlocksByRange", resp, peer_id, |b| b.len()) + self.on_rpc_response_result(resp, peer_id) } #[allow(clippy::type_complexity)] @@ -1494,7 +1494,7 @@ impl SyncNetworkContext { rpc_event: RpcEvent>>, ) -> Option>>>> { let resp = self.blobs_by_range_requests.on_response(id, rpc_event); - self.on_rpc_response_result(id, "BlobsByRangeRequest", resp, peer_id, |b| b.len()) + self.on_rpc_response_result(resp, peer_id) } #[allow(clippy::type_complexity)] @@ -1507,36 +1507,15 @@ impl SyncNetworkContext { let resp = self .data_columns_by_range_requests .on_response(id, rpc_event); - self.on_rpc_response_result(id, "DataColumnsByRange", resp, peer_id, |d| d.len()) + self.on_rpc_response_result(resp, peer_id) } - fn on_rpc_response_result usize>( + /// Common handler for consistent scoring of RpcResponseError + fn on_rpc_response_result( &mut self, - id: I, - method: &'static str, resp: Option>, peer_id: PeerId, - get_count: F, ) -> Option> { - match &resp { - None => {} - Some(Ok((v, _))) => { - debug!( - %id, - method, - count = get_count(v), - "Sync RPC request completed" - ); - } - Some(Err(e)) => { - debug!( - %id, - method, - error = ?e, - "Sync RPC request error" - ); - } - } if let Some(Err(RpcResponseError::VerifyError(e))) = &resp { self.report_peer(peer_id, PeerAction::LowToleranceError, e.into()); } diff --git a/beacon_node/network/src/sync/network_context/requests.rs b/beacon_node/network/src/sync/network_context/requests.rs index 3183c06d76..8f9540693e 100644 --- a/beacon_node/network/src/sync/network_context/requests.rs +++ b/beacon_node/network/src/sync/network_context/requests.rs @@ -1,10 +1,11 @@ +use std::time::Instant; use std::{collections::hash_map::Entry, hash::Hash}; use beacon_chain::validator_monitor::timestamp_now; use fnv::FnvHashMap; use lighthouse_network::PeerId; use strum::IntoStaticStr; -use tracing::Span; +use tracing::{Span, debug}; use types::{Hash256, Slot}; pub use blobs_by_range::BlobsByRangeRequestItems; @@ -18,7 +19,7 @@ pub use data_columns_by_root::{ use crate::metrics; -use super::{RpcEvent, RpcResponseResult}; +use super::{RpcEvent, RpcResponseError, RpcResponseResult}; mod blobs_by_range; mod blobs_by_root; @@ -51,6 +52,7 @@ struct ActiveRequest { peer_id: PeerId, // Error if the request terminates before receiving max expected responses expect_max_responses: bool, + start_instant: Instant, span: Span, } @@ -60,7 +62,7 @@ enum State { Errored, } -impl ActiveRequests { +impl ActiveRequests { pub fn new(name: &'static str) -> Self { Self { requests: <_>::default(), @@ -83,6 +85,7 @@ impl ActiveRequests { state: State::Active(items), peer_id, expect_max_responses, + start_instant: Instant::now(), span, }, ); @@ -112,7 +115,7 @@ impl ActiveRequests { return None; }; - match rpc_event { + let result = match rpc_event { // Handler of a success ReqResp chunk. Adds the item to the request accumulator. // `ActiveRequestItems` validates the item before appending to its internal state. RpcEvent::Response(item, seen_timestamp) => { @@ -126,7 +129,7 @@ impl ActiveRequests { Ok(true) => { let items = items.consume(); request.state = State::CompletedEarly; - Some(Ok((items, seen_timestamp))) + Some(Ok((items, seen_timestamp, request.start_instant.elapsed()))) } // Received item, but we are still expecting more Ok(false) => None, @@ -163,7 +166,11 @@ impl ActiveRequests { } .into())) } else { - Some(Ok((items.consume(), timestamp_now()))) + Some(Ok(( + items.consume(), + timestamp_now(), + request.start_instant.elapsed(), + ))) } } // Items already returned, ignore stream termination @@ -188,7 +195,41 @@ impl ActiveRequests { State::Errored => None, } } - } + }; + + result.map(|result| match result { + Ok((items, seen_timestamp, duration)) => { + metrics::inc_counter_vec(&metrics::SYNC_RPC_REQUEST_SUCCESSES, &[self.name]); + metrics::observe_timer_vec(&metrics::SYNC_RPC_REQUEST_TIME, &[self.name], duration); + debug!( + %id, + method = self.name, + count = items.len(), + "Sync RPC request completed" + ); + + Ok((items, seen_timestamp)) + } + Err(e) => { + let err_str: &'static str = match &e { + RpcResponseError::RpcError(e) => e.into(), + RpcResponseError::VerifyError(e) => e.into(), + RpcResponseError::CustodyRequestError(_) => "CustodyRequestError", + RpcResponseError::BlockComponentCouplingError(_) => { + "BlockComponentCouplingError" + } + }; + metrics::inc_counter_vec(&metrics::SYNC_RPC_REQUEST_ERRORS, &[self.name, err_str]); + debug!( + %id, + method = self.name, + error = ?e, + "Sync RPC request error" + ); + + Err(e) + } + }) } pub fn active_requests_of_peer(&self, peer_id: &PeerId) -> Vec<&K> {