diff --git a/beacon_node/network/src/metrics.rs b/beacon_node/network/src/metrics.rs index cea06a28c8..0016f66c01 100644 --- a/beacon_node/network/src/metrics.rs +++ b/beacon_node/network/src/metrics.rs @@ -507,6 +507,30 @@ pub static SYNC_UNKNOWN_NETWORK_REQUESTS: LazyLock> = Lazy &["type"], ) }); +pub static SYNC_RPC_REQUEST_SUCCESSES: LazyLock> = LazyLock::new(|| { + try_create_int_counter_vec( + "sync_rpc_requests_success_total", + "Total count of sync RPC requests successes", + &["protocol"], + ) +}); +pub static SYNC_RPC_REQUEST_ERRORS: LazyLock> = LazyLock::new(|| { + try_create_int_counter_vec( + "sync_rpc_requests_error_total", + "Total count of sync RPC requests errors", + &["protocol", "error"], + ) +}); +pub static SYNC_RPC_REQUEST_TIME: LazyLock> = LazyLock::new(|| { + try_create_histogram_vec_with_buckets( + "sync_rpc_request_duration_sec", + "Time to complete a successful sync RPC requesst", + Ok(vec![ + 0.001, 0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.25, 0.5, 1.0, 2.0, + ]), + &["protocol"], + ) +}); /* * Block Delay Metrics diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index 7f4da9c0da..542625b8a3 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -1430,7 +1430,7 @@ impl SyncNetworkContext { } }) }); - self.on_rpc_response_result(id, "BlocksByRoot", resp, peer_id, |_| 1) + self.on_rpc_response_result(resp, peer_id) } pub(crate) fn on_single_blob_response( @@ -1459,7 +1459,7 @@ impl SyncNetworkContext { } }) }); - self.on_rpc_response_result(id, "BlobsByRoot", resp, peer_id, |_| 1) + self.on_rpc_response_result(resp, peer_id) } #[allow(clippy::type_complexity)] @@ -1472,7 +1472,7 @@ impl SyncNetworkContext { let resp = self .data_columns_by_root_requests .on_response(id, rpc_event); - self.on_rpc_response_result(id, "DataColumnsByRoot", resp, peer_id, |_| 1) + self.on_rpc_response_result(resp, peer_id) } #[allow(clippy::type_complexity)] @@ -1483,7 +1483,7 @@ impl SyncNetworkContext { rpc_event: RpcEvent>>, ) -> Option>>>> { let resp = self.blocks_by_range_requests.on_response(id, rpc_event); - self.on_rpc_response_result(id, "BlocksByRange", resp, peer_id, |b| b.len()) + self.on_rpc_response_result(resp, peer_id) } #[allow(clippy::type_complexity)] @@ -1494,7 +1494,7 @@ impl SyncNetworkContext { rpc_event: RpcEvent>>, ) -> Option>>>> { let resp = self.blobs_by_range_requests.on_response(id, rpc_event); - self.on_rpc_response_result(id, "BlobsByRangeRequest", resp, peer_id, |b| b.len()) + self.on_rpc_response_result(resp, peer_id) } #[allow(clippy::type_complexity)] @@ -1507,36 +1507,15 @@ impl SyncNetworkContext { let resp = self .data_columns_by_range_requests .on_response(id, rpc_event); - self.on_rpc_response_result(id, "DataColumnsByRange", resp, peer_id, |d| d.len()) + self.on_rpc_response_result(resp, peer_id) } - fn on_rpc_response_result usize>( + /// Common handler for consistent scoring of RpcResponseError + fn on_rpc_response_result( &mut self, - id: I, - method: &'static str, resp: Option>, peer_id: PeerId, - get_count: F, ) -> Option> { - match &resp { - None => {} - Some(Ok((v, _))) => { - debug!( - %id, - method, - count = get_count(v), - "Sync RPC request completed" - ); - } - Some(Err(e)) => { - debug!( - %id, - method, - error = ?e, - "Sync RPC request error" - ); - } - } if let Some(Err(RpcResponseError::VerifyError(e))) = &resp { self.report_peer(peer_id, PeerAction::LowToleranceError, e.into()); } diff --git a/beacon_node/network/src/sync/network_context/requests.rs b/beacon_node/network/src/sync/network_context/requests.rs index 3183c06d76..8f9540693e 100644 --- a/beacon_node/network/src/sync/network_context/requests.rs +++ b/beacon_node/network/src/sync/network_context/requests.rs @@ -1,10 +1,11 @@ +use std::time::Instant; use std::{collections::hash_map::Entry, hash::Hash}; use beacon_chain::validator_monitor::timestamp_now; use fnv::FnvHashMap; use lighthouse_network::PeerId; use strum::IntoStaticStr; -use tracing::Span; +use tracing::{Span, debug}; use types::{Hash256, Slot}; pub use blobs_by_range::BlobsByRangeRequestItems; @@ -18,7 +19,7 @@ pub use data_columns_by_root::{ use crate::metrics; -use super::{RpcEvent, RpcResponseResult}; +use super::{RpcEvent, RpcResponseError, RpcResponseResult}; mod blobs_by_range; mod blobs_by_root; @@ -51,6 +52,7 @@ struct ActiveRequest { peer_id: PeerId, // Error if the request terminates before receiving max expected responses expect_max_responses: bool, + start_instant: Instant, span: Span, } @@ -60,7 +62,7 @@ enum State { Errored, } -impl ActiveRequests { +impl ActiveRequests { pub fn new(name: &'static str) -> Self { Self { requests: <_>::default(), @@ -83,6 +85,7 @@ impl ActiveRequests { state: State::Active(items), peer_id, expect_max_responses, + start_instant: Instant::now(), span, }, ); @@ -112,7 +115,7 @@ impl ActiveRequests { return None; }; - match rpc_event { + let result = match rpc_event { // Handler of a success ReqResp chunk. Adds the item to the request accumulator. // `ActiveRequestItems` validates the item before appending to its internal state. RpcEvent::Response(item, seen_timestamp) => { @@ -126,7 +129,7 @@ impl ActiveRequests { Ok(true) => { let items = items.consume(); request.state = State::CompletedEarly; - Some(Ok((items, seen_timestamp))) + Some(Ok((items, seen_timestamp, request.start_instant.elapsed()))) } // Received item, but we are still expecting more Ok(false) => None, @@ -163,7 +166,11 @@ impl ActiveRequests { } .into())) } else { - Some(Ok((items.consume(), timestamp_now()))) + Some(Ok(( + items.consume(), + timestamp_now(), + request.start_instant.elapsed(), + ))) } } // Items already returned, ignore stream termination @@ -188,7 +195,41 @@ impl ActiveRequests { State::Errored => None, } } - } + }; + + result.map(|result| match result { + Ok((items, seen_timestamp, duration)) => { + metrics::inc_counter_vec(&metrics::SYNC_RPC_REQUEST_SUCCESSES, &[self.name]); + metrics::observe_timer_vec(&metrics::SYNC_RPC_REQUEST_TIME, &[self.name], duration); + debug!( + %id, + method = self.name, + count = items.len(), + "Sync RPC request completed" + ); + + Ok((items, seen_timestamp)) + } + Err(e) => { + let err_str: &'static str = match &e { + RpcResponseError::RpcError(e) => e.into(), + RpcResponseError::VerifyError(e) => e.into(), + RpcResponseError::CustodyRequestError(_) => "CustodyRequestError", + RpcResponseError::BlockComponentCouplingError(_) => { + "BlockComponentCouplingError" + } + }; + metrics::inc_counter_vec(&metrics::SYNC_RPC_REQUEST_ERRORS, &[self.name, err_str]); + debug!( + %id, + method = self.name, + error = ?e, + "Sync RPC request error" + ); + + Err(e) + } + }) } pub fn active_requests_of_peer(&self, peer_id: &PeerId) -> Vec<&K> {