mirror of
https://github.com/sigp/lighthouse.git
synced 2026-03-14 10:22:38 +00:00
Additional networking metrics (#2549)
Adds additional metrics for network monitoring and evaluation. Co-authored-by: Mark Mackey <mark@sigmaprime.io>
This commit is contained in:
@@ -8,18 +8,19 @@ use crate::peer_manager::{
|
||||
ConnectionDirection, PeerManager, PeerManagerEvent,
|
||||
};
|
||||
use crate::rpc::*;
|
||||
use crate::service::METADATA_FILENAME;
|
||||
use crate::service::{Context as ServiceContext, METADATA_FILENAME};
|
||||
use crate::types::{
|
||||
subnet_from_topic_hash, GossipEncoding, GossipKind, GossipTopic, SnappyTransform, Subnet,
|
||||
SubnetDiscovery,
|
||||
};
|
||||
use crate::Eth2Enr;
|
||||
use crate::{error, metrics, Enr, NetworkConfig, NetworkGlobals, PubsubMessage, TopicHash};
|
||||
use crate::{error, metrics, Enr, NetworkGlobals, PubsubMessage, TopicHash};
|
||||
use libp2p::{
|
||||
core::{
|
||||
connection::ConnectionId, identity::Keypair, multiaddr::Protocol as MProtocol, Multiaddr,
|
||||
},
|
||||
gossipsub::{
|
||||
metrics::Config as GossipsubMetricsConfig,
|
||||
subscription_filter::{MaxCountSubscriptionFilter, WhitelistSubscriptionFilter},
|
||||
Gossipsub as BaseGossipsub, GossipsubEvent, IdentTopic as Topic, MessageAcceptance,
|
||||
MessageAuthenticity, MessageId,
|
||||
@@ -45,7 +46,7 @@ use std::{
|
||||
task::{Context, Poll},
|
||||
};
|
||||
use types::{
|
||||
consts::altair::SYNC_COMMITTEE_SUBNET_COUNT, ChainSpec, EnrForkId, EthSpec, ForkContext,
|
||||
consts::altair::SYNC_COMMITTEE_SUBNET_COUNT, EnrForkId, EthSpec, ForkContext,
|
||||
SignedBeaconBlock, Slot, SubnetId, SyncSubnetId,
|
||||
};
|
||||
|
||||
@@ -182,14 +183,14 @@ pub struct Behaviour<TSpec: EthSpec> {
|
||||
impl<TSpec: EthSpec> Behaviour<TSpec> {
|
||||
pub async fn new(
|
||||
local_key: &Keypair,
|
||||
mut config: NetworkConfig,
|
||||
ctx: ServiceContext<'_>,
|
||||
network_globals: Arc<NetworkGlobals<TSpec>>,
|
||||
log: &slog::Logger,
|
||||
fork_context: Arc<ForkContext>,
|
||||
chain_spec: &ChainSpec,
|
||||
) -> error::Result<Self> {
|
||||
let behaviour_log = log.new(o!());
|
||||
|
||||
let mut config = ctx.config.clone();
|
||||
|
||||
// Set up the Identify Behaviour
|
||||
let identify_config = if config.private {
|
||||
IdentifyConfig::new(
|
||||
@@ -215,25 +216,29 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {
|
||||
.eth2()
|
||||
.expect("Local ENR must have a fork id");
|
||||
|
||||
let possible_fork_digests = fork_context.all_fork_digests();
|
||||
let possible_fork_digests = ctx.fork_context.all_fork_digests();
|
||||
let filter = MaxCountSubscriptionFilter {
|
||||
filter: Self::create_whitelist_filter(
|
||||
possible_fork_digests,
|
||||
chain_spec.attestation_subnet_count,
|
||||
ctx.chain_spec.attestation_subnet_count,
|
||||
SYNC_COMMITTEE_SUBNET_COUNT,
|
||||
),
|
||||
max_subscribed_topics: 200,
|
||||
max_subscriptions_per_request: 150, // 148 in theory = (64 attestation + 4 sync committee + 6 core topics) * 2
|
||||
};
|
||||
|
||||
config.gs_config = gossipsub_config(fork_context.clone());
|
||||
config.gs_config = gossipsub_config(ctx.fork_context.clone());
|
||||
|
||||
// If metrics are enabled for gossipsub build the configuration
|
||||
let gossipsub_metrics = ctx
|
||||
.gossipsub_registry
|
||||
.map(|registry| (registry, GossipsubMetricsConfig::default()));
|
||||
|
||||
// Build and configure the Gossipsub behaviour
|
||||
let snappy_transform = SnappyTransform::new(config.gs_config.max_transmit_size());
|
||||
let mut gossipsub = Gossipsub::new_with_subscription_filter_and_transform(
|
||||
MessageAuthenticity::Anonymous,
|
||||
config.gs_config.clone(),
|
||||
None, // No metrics for the time being
|
||||
gossipsub_metrics,
|
||||
filter,
|
||||
snappy_transform,
|
||||
)
|
||||
@@ -246,7 +251,7 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {
|
||||
|
||||
let thresholds = lighthouse_gossip_thresholds();
|
||||
|
||||
let score_settings = PeerScoreSettings::new(chain_spec, &config.gs_config);
|
||||
let score_settings = PeerScoreSettings::new(ctx.chain_spec, &config.gs_config);
|
||||
|
||||
// Prepare scoring parameters
|
||||
let params = score_settings.get_peer_score_params(
|
||||
@@ -267,6 +272,7 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {
|
||||
|
||||
let peer_manager_cfg = PeerManagerCfg {
|
||||
discovery_enabled: !config.disable_discovery,
|
||||
metrics_enabled: config.metrics_enabled,
|
||||
target_peer_count: config.target_peers,
|
||||
..Default::default()
|
||||
};
|
||||
@@ -274,7 +280,7 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {
|
||||
Ok(Behaviour {
|
||||
// Sub-behaviours
|
||||
gossipsub,
|
||||
eth2_rpc: RPC::new(fork_context.clone(), log.clone()),
|
||||
eth2_rpc: RPC::new(ctx.fork_context.clone(), log.clone()),
|
||||
discovery,
|
||||
identify: Identify::new(identify_config),
|
||||
// Auxiliary fields
|
||||
@@ -287,7 +293,7 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {
|
||||
network_dir: config.network_dir.clone(),
|
||||
log: behaviour_log,
|
||||
score_settings,
|
||||
fork_context,
|
||||
fork_context: ctx.fork_context,
|
||||
update_gossipsub_scores,
|
||||
})
|
||||
}
|
||||
@@ -393,14 +399,15 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {
|
||||
.remove(&topic);
|
||||
|
||||
// unsubscribe from the topic
|
||||
let topic: Topic = topic.into();
|
||||
let libp2p_topic: Topic = topic.clone().into();
|
||||
|
||||
match self.gossipsub.unsubscribe(&topic) {
|
||||
match self.gossipsub.unsubscribe(&libp2p_topic) {
|
||||
Err(_) => {
|
||||
warn!(self.log, "Failed to unsubscribe from topic"; "topic" => %topic);
|
||||
warn!(self.log, "Failed to unsubscribe from topic"; "topic" => %libp2p_topic);
|
||||
false
|
||||
}
|
||||
Ok(v) => {
|
||||
// Inform the network
|
||||
debug!(self.log, "Unsubscribed to topic"; "topic" => %topic);
|
||||
v
|
||||
}
|
||||
@@ -732,6 +739,18 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {
|
||||
|
||||
/// Convenience function to propagate a request.
|
||||
fn propagate_request(&mut self, id: PeerRequestId, peer_id: PeerId, request: Request) {
|
||||
// Increment metrics
|
||||
match &request {
|
||||
Request::Status(_) => {
|
||||
metrics::inc_counter_vec(&metrics::TOTAL_RPC_REQUESTS, &["status"])
|
||||
}
|
||||
Request::BlocksByRange { .. } => {
|
||||
metrics::inc_counter_vec(&metrics::TOTAL_RPC_REQUESTS, &["blocks_by_range"])
|
||||
}
|
||||
Request::BlocksByRoot { .. } => {
|
||||
metrics::inc_counter_vec(&metrics::TOTAL_RPC_REQUESTS, &["blocks_by_root"])
|
||||
}
|
||||
}
|
||||
self.add_event(BehaviourEvent::RequestReceived {
|
||||
peer_id,
|
||||
id,
|
||||
|
||||
@@ -127,7 +127,7 @@ pub fn use_or_load_enr(
|
||||
pub fn build_or_load_enr<T: EthSpec>(
|
||||
local_key: Keypair,
|
||||
config: &NetworkConfig,
|
||||
enr_fork_id: EnrForkId,
|
||||
enr_fork_id: &EnrForkId,
|
||||
log: &slog::Logger,
|
||||
) -> Result<Enr, String> {
|
||||
// Build the local ENR.
|
||||
@@ -163,7 +163,7 @@ pub fn create_enr_builder_from_config<T: EnrKey>(
|
||||
pub fn build_enr<T: EthSpec>(
|
||||
enr_key: &CombinedKey,
|
||||
config: &NetworkConfig,
|
||||
enr_fork_id: EnrForkId,
|
||||
enr_fork_id: &EnrForkId,
|
||||
) -> Result<Enr, String> {
|
||||
let mut builder = create_enr_builder_from_config(config, true);
|
||||
|
||||
|
||||
@@ -1039,6 +1039,7 @@ impl<TSpec: EthSpec> NetworkBehaviour for Discovery<TSpec> {
|
||||
Discv5Event::SocketUpdated(socket) => {
|
||||
info!(self.log, "Address updated"; "ip" => %socket.ip(), "udp_port" => %socket.port());
|
||||
metrics::inc_counter(&metrics::ADDRESS_UPDATE_COUNT);
|
||||
metrics::check_nat();
|
||||
// Discv5 will have updated our local ENR. We save the updated version
|
||||
// to disk.
|
||||
let enr = self.discv5.local_enr();
|
||||
@@ -1096,7 +1097,7 @@ mod tests {
|
||||
..Default::default()
|
||||
};
|
||||
let enr_key: CombinedKey = CombinedKey::from_libp2p(&keypair).unwrap();
|
||||
let enr: Enr = build_enr::<E>(&enr_key, &config, EnrForkId::default()).unwrap();
|
||||
let enr: Enr = build_enr::<E>(&enr_key, &config, &EnrForkId::default()).unwrap();
|
||||
let log = build_log(slog::Level::Debug, false);
|
||||
let globals = NetworkGlobals::new(
|
||||
enr,
|
||||
|
||||
@@ -10,7 +10,7 @@ mod config;
|
||||
|
||||
#[allow(clippy::mutable_key_type)] // PeerId in hashmaps are no longer permitted by clippy
|
||||
pub mod discovery;
|
||||
mod metrics;
|
||||
pub mod metrics;
|
||||
pub mod peer_manager;
|
||||
pub mod rpc;
|
||||
mod service;
|
||||
@@ -66,13 +66,16 @@ pub use crate::types::{
|
||||
error, Enr, EnrSyncCommitteeBitfield, GossipTopic, NetworkGlobals, PubsubMessage, Subnet,
|
||||
SubnetDiscovery,
|
||||
};
|
||||
|
||||
pub use open_metrics_client;
|
||||
|
||||
pub use behaviour::{BehaviourEvent, Gossipsub, PeerRequestId, Request, Response};
|
||||
pub use config::Config as NetworkConfig;
|
||||
pub use discovery::{CombinedKeyExt, EnrExt, Eth2Enr};
|
||||
pub use discv5;
|
||||
pub use libp2p;
|
||||
pub use libp2p::bandwidth::BandwidthSinks;
|
||||
pub use libp2p::gossipsub::{MessageAcceptance, MessageId, Topic, TopicHash};
|
||||
pub use libp2p::gossipsub::{IdentTopic, MessageAcceptance, MessageId, Topic, TopicHash};
|
||||
pub use libp2p::{core::ConnectedPoint, PeerId, Swarm};
|
||||
pub use libp2p::{multiaddr, Multiaddr};
|
||||
pub use metrics::scrape_discovery_metrics;
|
||||
@@ -82,4 +85,4 @@ pub use peer_manager::{
|
||||
peerdb::PeerDB,
|
||||
ConnectionDirection, PeerConnectionStatus, PeerInfo, PeerManager, SyncInfo, SyncStatus,
|
||||
};
|
||||
pub use service::{load_private_key, Libp2pEvent, Service, NETWORK_KEY_FILENAME};
|
||||
pub use service::{load_private_key, Context, Libp2pEvent, Service, NETWORK_KEY_FILENAME};
|
||||
|
||||
@@ -1,16 +1,19 @@
|
||||
pub use lighthouse_metrics::*;
|
||||
|
||||
lazy_static! {
|
||||
pub static ref NAT_OPEN: Result<IntCounter> = try_create_int_counter(
|
||||
"nat_open",
|
||||
"An estimate indicating if the local node is exposed to the internet."
|
||||
);
|
||||
pub static ref ADDRESS_UPDATE_COUNT: Result<IntCounter> = try_create_int_counter(
|
||||
"libp2p_address_update_total",
|
||||
"Count of libp2p socked updated events (when our view of our IP address has changed)"
|
||||
);
|
||||
pub static ref PEERS_CONNECTED: Result<IntGauge> = try_create_int_gauge(
|
||||
"libp2p_peer_connected_peers_total",
|
||||
"libp2p_peers",
|
||||
"Count of libp2p peers currently connected"
|
||||
);
|
||||
pub static ref PEERS_CONNECTED_INTEROP: Result<IntGauge> =
|
||||
try_create_int_gauge("libp2p_peers", "Count of libp2p peers currently connected");
|
||||
|
||||
pub static ref PEER_CONNECT_EVENT_COUNT: Result<IntCounter> = try_create_int_counter(
|
||||
"libp2p_peer_connect_event_total",
|
||||
"Count of libp2p peer connect events (not the current number of connected peers)"
|
||||
@@ -19,6 +22,14 @@ lazy_static! {
|
||||
"libp2p_peer_disconnect_event_total",
|
||||
"Count of libp2p peer disconnect events"
|
||||
);
|
||||
pub static ref DISCOVERY_SENT_BYTES: Result<IntGauge> = try_create_int_gauge(
|
||||
"discovery_sent_bytes",
|
||||
"The number of bytes sent in discovery"
|
||||
);
|
||||
pub static ref DISCOVERY_RECV_BYTES: Result<IntGauge> = try_create_int_gauge(
|
||||
"discovery_recv_bytes",
|
||||
"The number of bytes received in discovery"
|
||||
);
|
||||
pub static ref DISCOVERY_QUEUE: Result<IntGauge> = try_create_int_gauge(
|
||||
"discovery_queue_size",
|
||||
"The number of discovery queries awaiting execution"
|
||||
@@ -31,11 +42,7 @@ lazy_static! {
|
||||
"discovery_sessions",
|
||||
"The number of active discovery sessions with peers"
|
||||
);
|
||||
pub static ref DISCOVERY_REQS_IP: Result<GaugeVec> = try_create_float_gauge_vec(
|
||||
"discovery_reqs_per_ip",
|
||||
"Unsolicited discovery requests per ip per second",
|
||||
&["Addresses"]
|
||||
);
|
||||
|
||||
pub static ref PEERS_PER_CLIENT: Result<IntGaugeVec> = try_create_int_gauge_vec(
|
||||
"libp2p_peers_per_client",
|
||||
"The connected peers via client implementation",
|
||||
@@ -57,6 +64,11 @@ lazy_static! {
|
||||
"RPC errors per client",
|
||||
&["client", "rpc_error", "direction"]
|
||||
);
|
||||
pub static ref TOTAL_RPC_REQUESTS: Result<IntCounterVec> = try_create_int_counter_vec(
|
||||
"libp2p_rpc_requests_total",
|
||||
"RPC requests total",
|
||||
&["type"]
|
||||
);
|
||||
pub static ref PEER_ACTION_EVENTS_PER_CLIENT: Result<IntCounterVec> =
|
||||
try_create_int_counter_vec(
|
||||
"libp2p_peer_actions_per_client",
|
||||
@@ -69,26 +81,57 @@ lazy_static! {
|
||||
"Gossipsub messages that we did not accept, per client",
|
||||
&["client", "validation_result"]
|
||||
);
|
||||
|
||||
pub static ref PEER_SCORE_DISTRIBUTION: Result<IntGaugeVec> =
|
||||
try_create_int_gauge_vec(
|
||||
"peer_score_distribution",
|
||||
"The distribution of connected peer scores",
|
||||
&["position"]
|
||||
);
|
||||
|
||||
pub static ref PEER_SCORE_PER_CLIENT: Result<GaugeVec> =
|
||||
try_create_float_gauge_vec(
|
||||
"peer_score_per_client",
|
||||
"Average score per client",
|
||||
&["client"]
|
||||
);
|
||||
|
||||
/*
|
||||
* Inbound/Outbound peers
|
||||
*/
|
||||
/// The number of peers that dialed us.
|
||||
pub static ref NETWORK_INBOUND_PEERS: Result<IntGauge> =
|
||||
try_create_int_gauge("network_inbound_peers","The number of peers that are currently connected that have dialed us.");
|
||||
|
||||
/// The number of peers that we dialed us.
|
||||
pub static ref NETWORK_OUTBOUND_PEERS: Result<IntGauge> =
|
||||
try_create_int_gauge("network_outbound_peers","The number of peers that are currently connected that we dialed.");
|
||||
}
|
||||
|
||||
/// Checks if we consider the NAT open.
|
||||
///
|
||||
/// Conditions for an open NAT:
|
||||
/// 1. We have 1 or more SOCKET_UPDATED messages. This occurs when discovery has a majority of
|
||||
/// users reporting an external port and our ENR gets updated.
|
||||
/// 2. We have 0 SOCKET_UPDATED messages (can be true if the port was correct on boot), then we
|
||||
/// rely on whether we have any inbound messages. If we have no socket update messages, but
|
||||
/// manage to get at least one inbound peer, we are exposed correctly.
|
||||
pub fn check_nat() {
|
||||
// NAT is already deemed open.
|
||||
if NAT_OPEN.as_ref().map(|v| v.get()).unwrap_or(0) != 0 {
|
||||
return;
|
||||
}
|
||||
if ADDRESS_UPDATE_COUNT.as_ref().map(|v| v.get()).unwrap_or(0) == 0
|
||||
|| NETWORK_INBOUND_PEERS.as_ref().map(|v| v.get()).unwrap_or(0) != 0_i64
|
||||
{
|
||||
inc_counter(&NAT_OPEN);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn scrape_discovery_metrics() {
|
||||
let metrics = discv5::metrics::Metrics::from(discv5::Discv5::raw_metrics());
|
||||
|
||||
set_float_gauge(&DISCOVERY_REQS, metrics.unsolicited_requests_per_second);
|
||||
|
||||
set_gauge(&DISCOVERY_SESSIONS, metrics.active_sessions as i64);
|
||||
|
||||
let process_gauge_vec = |gauge: &Result<GaugeVec>, metrics: discv5::metrics::Metrics| {
|
||||
if let Ok(gauge_vec) = gauge {
|
||||
gauge_vec.reset();
|
||||
for (ip, value) in metrics.requests_per_ip_per_second.iter() {
|
||||
if let Ok(metric) = gauge_vec.get_metric_with_label_values(&[&format!("{:?}", ip)])
|
||||
{
|
||||
metric.set(*value);
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
process_gauge_vec(&DISCOVERY_REQS_IP, metrics);
|
||||
set_gauge(&DISCOVERY_SENT_BYTES, metrics.bytes_sent as i64);
|
||||
set_gauge(&DISCOVERY_RECV_BYTES, metrics.bytes_recv as i64);
|
||||
}
|
||||
|
||||
@@ -16,6 +16,8 @@ pub struct Config {
|
||||
/* Peer count related configurations */
|
||||
/// Whether discovery is enabled.
|
||||
pub discovery_enabled: bool,
|
||||
/// Whether metrics are enabled.
|
||||
pub metrics_enabled: bool,
|
||||
/// Target number of peers to connect to.
|
||||
pub target_peer_count: usize,
|
||||
|
||||
@@ -34,6 +36,7 @@ impl Default for Config {
|
||||
fn default() -> Self {
|
||||
Config {
|
||||
discovery_enabled: true,
|
||||
metrics_enabled: false,
|
||||
target_peer_count: DEFAULT_TARGET_PEERS,
|
||||
status_interval: DEFAULT_STATUS_INTERVAL,
|
||||
ping_interval_inbound: DEFAULT_PING_INTERVAL_INBOUND,
|
||||
|
||||
@@ -8,13 +8,14 @@ use crate::{Subnet, SubnetDiscovery};
|
||||
use discv5::Enr;
|
||||
use hashset_delay::HashSetDelay;
|
||||
use libp2p::identify::IdentifyInfo;
|
||||
use peerdb::{BanOperation, BanResult, ScoreUpdateResult};
|
||||
use peerdb::{client::ClientKind, BanOperation, BanResult, ScoreUpdateResult};
|
||||
use slog::{debug, error, warn};
|
||||
use smallvec::SmallVec;
|
||||
use std::{
|
||||
sync::Arc,
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
use strum::IntoEnumIterator;
|
||||
use types::{EthSpec, SyncSubnetId};
|
||||
|
||||
pub use libp2p::core::{identity::Keypair, Multiaddr};
|
||||
@@ -71,6 +72,8 @@ pub struct PeerManager<TSpec: EthSpec> {
|
||||
heartbeat: tokio::time::Interval,
|
||||
/// Keeps track of whether the discovery service is enabled or not.
|
||||
discovery_enabled: bool,
|
||||
/// Keeps track if the current instance is reporting metrics or not.
|
||||
metrics_enabled: bool,
|
||||
/// The logger associated with the `PeerManager`.
|
||||
log: slog::Logger,
|
||||
}
|
||||
@@ -111,6 +114,7 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
|
||||
) -> error::Result<Self> {
|
||||
let config::Config {
|
||||
discovery_enabled,
|
||||
metrics_enabled,
|
||||
target_peer_count,
|
||||
status_interval,
|
||||
ping_interval_inbound,
|
||||
@@ -130,6 +134,7 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
|
||||
sync_committee_subnets: Default::default(),
|
||||
heartbeat,
|
||||
discovery_enabled,
|
||||
metrics_enabled,
|
||||
log: log.clone(),
|
||||
})
|
||||
}
|
||||
@@ -378,19 +383,21 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
|
||||
"protocols" => ?info.protocols
|
||||
);
|
||||
|
||||
// update the peer client kind metric
|
||||
if let Some(v) = metrics::get_int_gauge(
|
||||
&metrics::PEERS_PER_CLIENT,
|
||||
&[&peer_info.client().kind.to_string()],
|
||||
// update the peer client kind metric if the peer is connected
|
||||
if matches!(
|
||||
peer_info.connection_status(),
|
||||
PeerConnectionStatus::Connected { .. }
|
||||
| PeerConnectionStatus::Disconnecting { .. }
|
||||
) {
|
||||
v.inc()
|
||||
};
|
||||
if let Some(v) = metrics::get_int_gauge(
|
||||
&metrics::PEERS_PER_CLIENT,
|
||||
&[&previous_kind.to_string()],
|
||||
) {
|
||||
v.dec()
|
||||
};
|
||||
metrics::inc_gauge_vec(
|
||||
&metrics::PEERS_PER_CLIENT,
|
||||
&[&peer_info.client().kind.to_string()],
|
||||
);
|
||||
metrics::dec_gauge_vec(
|
||||
&metrics::PEERS_PER_CLIENT,
|
||||
&[&previous_kind.to_string()],
|
||||
);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
error!(self.log, "Received an Identify response from an unknown peer"; "peer_id" => peer_id.to_string());
|
||||
@@ -606,6 +613,46 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
|
||||
}
|
||||
}
|
||||
|
||||
// This function updates metrics for all connected peers.
|
||||
fn update_connected_peer_metrics(&self) {
|
||||
// Do nothing if we don't have metrics enabled.
|
||||
if !self.metrics_enabled {
|
||||
return;
|
||||
}
|
||||
|
||||
let mut connected_peer_count = 0;
|
||||
let mut inbound_connected_peers = 0;
|
||||
let mut outbound_connected_peers = 0;
|
||||
let mut clients_per_peer = HashMap::new();
|
||||
|
||||
for (_peer, peer_info) in self.network_globals.peers.read().connected_peers() {
|
||||
connected_peer_count += 1;
|
||||
if let PeerConnectionStatus::Connected { n_in, .. } = peer_info.connection_status() {
|
||||
if *n_in > 0 {
|
||||
inbound_connected_peers += 1;
|
||||
} else {
|
||||
outbound_connected_peers += 1;
|
||||
}
|
||||
}
|
||||
*clients_per_peer
|
||||
.entry(peer_info.client().kind.to_string())
|
||||
.or_default() += 1;
|
||||
}
|
||||
|
||||
metrics::set_gauge(&metrics::PEERS_CONNECTED, connected_peer_count);
|
||||
metrics::set_gauge(&metrics::NETWORK_INBOUND_PEERS, inbound_connected_peers);
|
||||
metrics::set_gauge(&metrics::NETWORK_OUTBOUND_PEERS, outbound_connected_peers);
|
||||
|
||||
for client_kind in ClientKind::iter() {
|
||||
let value = clients_per_peer.get(&client_kind.to_string()).unwrap_or(&0);
|
||||
metrics::set_gauge_vec(
|
||||
&metrics::PEERS_PER_CLIENT,
|
||||
&[&client_kind.to_string()],
|
||||
*value as i64,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
/* Internal functions */
|
||||
|
||||
/// Sets a peer as connected as long as their reputation allows it
|
||||
@@ -705,22 +752,6 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
|
||||
// increment prometheus metrics
|
||||
metrics::inc_counter(&metrics::PEER_CONNECT_EVENT_COUNT);
|
||||
metrics::set_gauge(&metrics::PEERS_CONNECTED, connected_peers);
|
||||
metrics::set_gauge(&metrics::PEERS_CONNECTED_INTEROP, connected_peers);
|
||||
|
||||
// Increment the PEERS_PER_CLIENT metric
|
||||
if let Some(kind) = self
|
||||
.network_globals
|
||||
.peers
|
||||
.read()
|
||||
.peer_info(peer_id)
|
||||
.map(|peer_info| peer_info.client().kind.clone())
|
||||
{
|
||||
if let Some(v) =
|
||||
metrics::get_int_gauge(&metrics::PEERS_PER_CLIENT, &[&kind.to_string()])
|
||||
{
|
||||
v.inc()
|
||||
};
|
||||
}
|
||||
|
||||
true
|
||||
}
|
||||
@@ -802,6 +833,9 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
|
||||
self.handle_score_action(&peer_id, action, None);
|
||||
}
|
||||
|
||||
// Update peer score metrics;
|
||||
self.update_peer_score_metrics();
|
||||
|
||||
// Maintain minimum count for sync committee peers.
|
||||
self.maintain_sync_committee_peers();
|
||||
|
||||
@@ -840,6 +874,75 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
|
||||
self.disconnect_peer(peer_id, GoodbyeReason::TooManyPeers);
|
||||
}
|
||||
}
|
||||
|
||||
// Update metrics related to peer scoring.
|
||||
fn update_peer_score_metrics(&self) {
|
||||
if !self.metrics_enabled {
|
||||
return;
|
||||
}
|
||||
// reset the gauges
|
||||
let _ = metrics::PEER_SCORE_DISTRIBUTION
|
||||
.as_ref()
|
||||
.map(|gauge| gauge.reset());
|
||||
let _ = metrics::PEER_SCORE_PER_CLIENT
|
||||
.as_ref()
|
||||
.map(|gauge| gauge.reset());
|
||||
|
||||
let mut avg_score_per_client: HashMap<String, (f64, usize)> = HashMap::with_capacity(5);
|
||||
{
|
||||
let peers_db_read_lock = self.network_globals.peers.read();
|
||||
let connected_peers = peers_db_read_lock.best_peers_by_status(PeerInfo::is_connected);
|
||||
let total_peers = connected_peers.len();
|
||||
for (id, (_peer, peer_info)) in connected_peers.into_iter().enumerate() {
|
||||
// First quartile
|
||||
if id == 0 {
|
||||
metrics::set_gauge_vec(
|
||||
&metrics::PEER_SCORE_DISTRIBUTION,
|
||||
&["1st"],
|
||||
peer_info.score().score() as i64,
|
||||
);
|
||||
} else if id == (total_peers * 3 / 4).saturating_sub(1) {
|
||||
metrics::set_gauge_vec(
|
||||
&metrics::PEER_SCORE_DISTRIBUTION,
|
||||
&["3/4"],
|
||||
peer_info.score().score() as i64,
|
||||
);
|
||||
} else if id == (total_peers / 2).saturating_sub(1) {
|
||||
metrics::set_gauge_vec(
|
||||
&metrics::PEER_SCORE_DISTRIBUTION,
|
||||
&["1/2"],
|
||||
peer_info.score().score() as i64,
|
||||
);
|
||||
} else if id == (total_peers / 4).saturating_sub(1) {
|
||||
metrics::set_gauge_vec(
|
||||
&metrics::PEER_SCORE_DISTRIBUTION,
|
||||
&["1/4"],
|
||||
peer_info.score().score() as i64,
|
||||
);
|
||||
} else if id == total_peers.saturating_sub(1) {
|
||||
metrics::set_gauge_vec(
|
||||
&metrics::PEER_SCORE_DISTRIBUTION,
|
||||
&["last"],
|
||||
peer_info.score().score() as i64,
|
||||
);
|
||||
}
|
||||
|
||||
let mut score_peers: &mut (f64, usize) = avg_score_per_client
|
||||
.entry(peer_info.client().kind.to_string())
|
||||
.or_default();
|
||||
score_peers.0 += peer_info.score().score();
|
||||
score_peers.1 += 1;
|
||||
}
|
||||
} // read lock ended
|
||||
|
||||
for (client, (score, peers)) in avg_score_per_client {
|
||||
metrics::set_float_gauge_vec(
|
||||
&metrics::PEER_SCORE_PER_CLIENT,
|
||||
&[&client.to_string()],
|
||||
score / (peers as f64),
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
enum ConnectingType {
|
||||
|
||||
@@ -111,8 +111,11 @@ impl<TSpec: EthSpec> NetworkBehaviour for PeerManager<TSpec> {
|
||||
endpoint: &ConnectedPoint,
|
||||
_failed_addresses: Option<&Vec<Multiaddr>>,
|
||||
) {
|
||||
// Log the connection
|
||||
debug!(self.log, "Connection established"; "peer_id" => %peer_id, "connection" => ?endpoint.to_endpoint());
|
||||
// Check NAT if metrics are enabled
|
||||
if self.network_globals.local_enr.read().udp().is_some() {
|
||||
metrics::check_nat();
|
||||
}
|
||||
|
||||
// Check to make sure the peer is not supposed to be banned
|
||||
match self.ban_status(peer_id) {
|
||||
@@ -150,10 +153,8 @@ impl<TSpec: EthSpec> NetworkBehaviour for PeerManager<TSpec> {
|
||||
return;
|
||||
}
|
||||
|
||||
// Register the newly connected peer (regardless if we are about to disconnect them).
|
||||
// NOTE: We don't register peers that we are disconnecting immediately. The network service
|
||||
// does not need to know about these peers.
|
||||
// let enr
|
||||
match endpoint {
|
||||
ConnectedPoint::Listener { send_back_addr, .. } => {
|
||||
self.inject_connect_ingoing(peer_id, send_back_addr.clone(), None);
|
||||
@@ -167,12 +168,9 @@ impl<TSpec: EthSpec> NetworkBehaviour for PeerManager<TSpec> {
|
||||
}
|
||||
}
|
||||
|
||||
let connected_peers = self.network_globals.connected_peers() as i64;
|
||||
|
||||
// increment prometheus metrics
|
||||
self.update_connected_peer_metrics();
|
||||
metrics::inc_counter(&metrics::PEER_CONNECT_EVENT_COUNT);
|
||||
metrics::set_gauge(&metrics::PEERS_CONNECTED, connected_peers);
|
||||
metrics::set_gauge(&metrics::PEERS_CONNECTED_INTEROP, connected_peers);
|
||||
}
|
||||
|
||||
fn inject_disconnected(&mut self, peer_id: &PeerId) {
|
||||
@@ -190,21 +188,6 @@ impl<TSpec: EthSpec> NetworkBehaviour for PeerManager<TSpec> {
|
||||
self.events
|
||||
.push(PeerManagerEvent::PeerDisconnected(*peer_id));
|
||||
debug!(self.log, "Peer disconnected"; "peer_id" => %peer_id);
|
||||
|
||||
// Decrement the PEERS_PER_CLIENT metric
|
||||
if let Some(kind) = self
|
||||
.network_globals
|
||||
.peers
|
||||
.read()
|
||||
.peer_info(peer_id)
|
||||
.map(|info| info.client().kind.clone())
|
||||
{
|
||||
if let Some(v) =
|
||||
metrics::get_int_gauge(&metrics::PEERS_PER_CLIENT, &[&kind.to_string()])
|
||||
{
|
||||
v.dec()
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
// NOTE: It may be the case that a rejected node, due to too many peers is disconnected
|
||||
@@ -212,12 +195,9 @@ impl<TSpec: EthSpec> NetworkBehaviour for PeerManager<TSpec> {
|
||||
// reference so that peer manager can track this peer.
|
||||
self.inject_disconnect(peer_id);
|
||||
|
||||
let connected_peers = self.network_globals.connected_peers() as i64;
|
||||
|
||||
// Update the prometheus metrics
|
||||
self.update_connected_peer_metrics();
|
||||
metrics::inc_counter(&metrics::PEER_DISCONNECT_EVENT_COUNT);
|
||||
metrics::set_gauge(&metrics::PEERS_CONNECTED, connected_peers);
|
||||
metrics::set_gauge(&metrics::PEERS_CONNECTED_INTEROP, connected_peers);
|
||||
}
|
||||
|
||||
fn inject_address_change(
|
||||
|
||||
@@ -4,7 +4,7 @@
|
||||
|
||||
use libp2p::identify::IdentifyInfo;
|
||||
use serde::Serialize;
|
||||
use strum::{AsRefStr, AsStaticStr};
|
||||
use strum::{AsRefStr, AsStaticStr, EnumIter};
|
||||
|
||||
/// Various client and protocol information related to a node.
|
||||
#[derive(Clone, Debug, Serialize)]
|
||||
@@ -21,7 +21,7 @@ pub struct Client {
|
||||
pub agent_string: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Serialize, PartialEq, AsRefStr, AsStaticStr)]
|
||||
#[derive(Clone, Debug, Serialize, PartialEq, AsRefStr, AsStaticStr, EnumIter)]
|
||||
pub enum ClientKind {
|
||||
/// A lighthouse node (the best kind).
|
||||
Lighthouse,
|
||||
|
||||
@@ -19,8 +19,6 @@ use PeerConnectionStatus::*;
|
||||
#[derive(Clone, Debug, Serialize)]
|
||||
#[serde(bound = "T: EthSpec")]
|
||||
pub struct PeerInfo<T: EthSpec> {
|
||||
/// The connection status of the peer
|
||||
_status: PeerStatus,
|
||||
/// The peers reputation
|
||||
score: Score,
|
||||
/// Client managing this peer
|
||||
@@ -57,7 +55,6 @@ pub struct PeerInfo<T: EthSpec> {
|
||||
impl<TSpec: EthSpec> Default for PeerInfo<TSpec> {
|
||||
fn default() -> PeerInfo<TSpec> {
|
||||
PeerInfo {
|
||||
_status: Default::default(),
|
||||
score: Score::default(),
|
||||
client: Client::default(),
|
||||
connection_status: Default::default(),
|
||||
@@ -387,21 +384,6 @@ impl<T: EthSpec> PeerInfo<T> {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Serialize)]
|
||||
/// The current health status of the peer.
|
||||
pub enum PeerStatus {
|
||||
/// The peer is healthy.
|
||||
Healthy,
|
||||
/// The peer is clogged. It has not been responding to requests on time.
|
||||
_Clogged,
|
||||
}
|
||||
|
||||
impl Default for PeerStatus {
|
||||
fn default() -> Self {
|
||||
PeerStatus::Healthy
|
||||
}
|
||||
}
|
||||
|
||||
/// Connection Direction of connection.
|
||||
#[derive(Debug, Clone, Serialize, AsRefStr)]
|
||||
#[strum(serialize_all = "snake_case")]
|
||||
|
||||
@@ -20,6 +20,7 @@ use libp2p::{
|
||||
swarm::{SwarmBuilder, SwarmEvent},
|
||||
PeerId, Swarm, Transport,
|
||||
};
|
||||
use open_metrics_client::registry::Registry;
|
||||
use slog::{crit, debug, info, o, trace, warn, Logger};
|
||||
use ssz::Decode;
|
||||
use std::fs::File;
|
||||
@@ -62,27 +63,34 @@ pub struct Service<TSpec: EthSpec> {
|
||||
pub log: Logger,
|
||||
}
|
||||
|
||||
pub struct Context<'a> {
|
||||
pub config: &'a NetworkConfig,
|
||||
pub enr_fork_id: EnrForkId,
|
||||
pub fork_context: Arc<ForkContext>,
|
||||
pub chain_spec: &'a ChainSpec,
|
||||
pub gossipsub_registry: Option<&'a mut Registry>,
|
||||
}
|
||||
|
||||
impl<TSpec: EthSpec> Service<TSpec> {
|
||||
pub async fn new(
|
||||
executor: task_executor::TaskExecutor,
|
||||
config: &NetworkConfig,
|
||||
enr_fork_id: EnrForkId,
|
||||
ctx: Context<'_>,
|
||||
log: &Logger,
|
||||
fork_context: Arc<ForkContext>,
|
||||
chain_spec: &ChainSpec,
|
||||
) -> error::Result<(Arc<NetworkGlobals<TSpec>>, Self)> {
|
||||
let log = log.new(o!("service"=> "libp2p"));
|
||||
trace!(log, "Libp2p Service starting");
|
||||
|
||||
let config = ctx.config;
|
||||
// initialise the node's ID
|
||||
let local_keypair = load_private_key(config, &log);
|
||||
|
||||
// Create an ENR or load from disk if appropriate
|
||||
let enr =
|
||||
enr::build_or_load_enr::<TSpec>(local_keypair.clone(), config, enr_fork_id, &log)?;
|
||||
enr::build_or_load_enr::<TSpec>(local_keypair.clone(), config, &ctx.enr_fork_id, &log)?;
|
||||
|
||||
let local_peer_id = enr.peer_id();
|
||||
|
||||
// Construct the metadata
|
||||
let meta_data = load_or_build_metadata(&config.network_dir, &log);
|
||||
|
||||
// set up a collection of variables accessible outside of the network crate
|
||||
@@ -113,15 +121,8 @@ impl<TSpec: EthSpec> Service<TSpec> {
|
||||
.map_err(|e| format!("Failed to build transport: {:?}", e))?;
|
||||
|
||||
// Lighthouse network behaviour
|
||||
let behaviour = Behaviour::new(
|
||||
&local_keypair,
|
||||
config.clone(),
|
||||
network_globals.clone(),
|
||||
&log,
|
||||
fork_context,
|
||||
chain_spec,
|
||||
)
|
||||
.await?;
|
||||
let behaviour =
|
||||
Behaviour::new(&local_keypair, ctx, network_globals.clone(), &log).await?;
|
||||
|
||||
// use the executor for libp2p
|
||||
struct Executor(task_executor::TaskExecutor);
|
||||
|
||||
Reference in New Issue
Block a user