diff --git a/beacon_node/network/src/metrics.rs b/beacon_node/network/src/metrics.rs index 32e57da8ae..dbcc8fb9b4 100644 --- a/beacon_node/network/src/metrics.rs +++ b/beacon_node/network/src/metrics.rs @@ -262,6 +262,16 @@ lazy_static! { "sync_lookups_stuck_total", "Total count of sync lookups that are stuck and dropped", ); + pub static ref SYNC_ACTIVE_NETWORK_REQUESTS: Result = try_create_int_gauge_vec( + "sync_active_network_requests", + "Current count of active network requests from sync", + &["type"], + ); + pub static ref SYNC_UNKNOWN_NETWORK_REQUESTS: Result = try_create_int_counter_vec( + "sync_unknwon_network_request", + "Total count of network messages received for unknown active requests", + &["type"], + ); /* * Block Delay Metrics diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index ee538e8e28..23c05a6e16 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -570,6 +570,8 @@ impl SyncManager { // unless there is a bug. let mut prune_lookups_interval = tokio::time::interval(Duration::from_secs(15)); + let mut register_metrics_interval = tokio::time::interval(Duration::from_secs(5)); + // process any inbound messages loop { tokio::select! { @@ -582,6 +584,9 @@ impl SyncManager { _ = prune_lookups_interval.tick() => { self.block_lookups.prune_lookups(); } + _ = register_metrics_interval.tick() => { + self.network.register_metrics(); + } } } } diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index 33d56ae87e..df8be9f6d5 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -5,6 +5,7 @@ use self::requests::{ActiveBlobsByRootRequest, ActiveBlocksByRootRequest}; pub use self::requests::{BlobsByRootSingleBlockRequest, BlocksByRootSingleRequest}; use super::block_sidecar_coupling::BlocksAndBlobsRequestInfo; use super::range_sync::{BatchId, ByRangeRequestType, ChainId}; +use crate::metrics; use crate::network_beacon_processor::NetworkBeaconProcessor; use crate::service::NetworkMessage; use crate::status::ToStatusMessage; @@ -348,27 +349,28 @@ impl SyncNetworkContext { request_id: Id, block_or_blob: BlockOrBlob, ) -> Option> { - match self.range_blocks_and_blobs_requests.entry(request_id) { - Entry::Occupied(mut entry) => { - let (_, info) = entry.get_mut(); - match block_or_blob { - BlockOrBlob::Block(maybe_block) => info.add_block_response(maybe_block), - BlockOrBlob::Blob(maybe_sidecar) => info.add_sidecar_response(maybe_sidecar), - } - if info.is_finished() { - // If the request is finished, dequeue everything - let (sender_id, info) = entry.remove(); - let request_type = info.get_request_type(); - Some(BlocksAndBlobsByRangeResponse { - sender_id, - request_type, - responses: info.into_responses(), - }) - } else { - None - } - } - Entry::Vacant(_) => None, + let Entry::Occupied(mut entry) = self.range_blocks_and_blobs_requests.entry(request_id) + else { + metrics::inc_counter_vec(&metrics::SYNC_UNKNOWN_NETWORK_REQUESTS, &["range_blocks"]); + return None; + }; + + let (_, info) = entry.get_mut(); + match block_or_blob { + BlockOrBlob::Block(maybe_block) => info.add_block_response(maybe_block), + BlockOrBlob::Blob(maybe_sidecar) => info.add_sidecar_response(maybe_sidecar), + } + if info.is_finished() { + // If the request is finished, dequeue everything + let (sender_id, info) = entry.remove(); + let request_type = info.get_request_type(); + Some(BlocksAndBlobsByRangeResponse { + sender_id, + request_type, + responses: info.into_responses(), + }) + } else { + None } } @@ -631,6 +633,7 @@ impl SyncNetworkContext { block: RpcEvent>>, ) -> Option>>> { let Entry::Occupied(mut request) = self.blocks_by_root_requests.entry(request_id) else { + metrics::inc_counter_vec(&metrics::SYNC_UNKNOWN_NETWORK_REQUESTS, &["blocks_by_root"]); return None; }; @@ -668,6 +671,7 @@ impl SyncNetworkContext { blob: RpcEvent>>, ) -> Option>> { let Entry::Occupied(mut request) = self.blobs_by_root_requests.entry(request_id) else { + metrics::inc_counter_vec(&metrics::SYNC_UNKNOWN_NETWORK_REQUESTS, &["blobs_by_root"]); return None; }; @@ -771,6 +775,24 @@ impl SyncNetworkContext { SendErrorProcessor::SendError }) } + + pub(crate) fn register_metrics(&self) { + metrics::set_gauge_vec( + &metrics::SYNC_ACTIVE_NETWORK_REQUESTS, + &["blocks_by_root"], + self.blocks_by_root_requests.len() as i64, + ); + metrics::set_gauge_vec( + &metrics::SYNC_ACTIVE_NETWORK_REQUESTS, + &["blobs_by_root"], + self.blobs_by_root_requests.len() as i64, + ); + metrics::set_gauge_vec( + &metrics::SYNC_ACTIVE_NETWORK_REQUESTS, + &["range_blocks"], + self.range_blocks_and_blobs_requests.len() as i64, + ); + } } fn to_fixed_blob_sidecar_list(