diff --git a/Cargo.lock b/Cargo.lock index 0c98ea3e92..e6285fbb40 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2624,7 +2624,7 @@ checksum = "c7d73b3f436185384286bd8098d17ec07c9a7d2388a6599f824d8502b529702a" [[package]] name = "libp2p" version = "0.25.0" -source = "git+https://github.com/sigp/rust-libp2p?rev=d0f9d6b9b3fef9616026f3ddf11d75fe9f7a41df#d0f9d6b9b3fef9616026f3ddf11d75fe9f7a41df" +source = "git+https://github.com/sigp/rust-libp2p?rev=03f998022ce2f566a6c6e6c4206bc0ce4d45109f#03f998022ce2f566a6c6e6c4206bc0ce4d45109f" dependencies = [ "atomic", "bytes 0.5.6", @@ -2641,7 +2641,7 @@ dependencies = [ "libp2p-tcp", "libp2p-websocket", "multihash", - "parity-multiaddr 0.9.1 (git+https://github.com/sigp/rust-libp2p?rev=d0f9d6b9b3fef9616026f3ddf11d75fe9f7a41df)", + "parity-multiaddr 0.9.1 (git+https://github.com/sigp/rust-libp2p?rev=03f998022ce2f566a6c6e6c4206bc0ce4d45109f)", "parking_lot 0.10.2", "pin-project", "smallvec 1.4.2", @@ -2685,7 +2685,7 @@ dependencies = [ [[package]] name = "libp2p-core" version = "0.21.0" -source = "git+https://github.com/sigp/rust-libp2p?rev=d0f9d6b9b3fef9616026f3ddf11d75fe9f7a41df#d0f9d6b9b3fef9616026f3ddf11d75fe9f7a41df" +source = "git+https://github.com/sigp/rust-libp2p?rev=03f998022ce2f566a6c6e6c4206bc0ce4d45109f#03f998022ce2f566a6c6e6c4206bc0ce4d45109f" dependencies = [ "asn1_der", "bs58", @@ -2698,8 +2698,8 @@ dependencies = [ "libsecp256k1", "log 0.4.11", "multihash", - "multistream-select 0.8.2 (git+https://github.com/sigp/rust-libp2p?rev=d0f9d6b9b3fef9616026f3ddf11d75fe9f7a41df)", - "parity-multiaddr 0.9.1 (git+https://github.com/sigp/rust-libp2p?rev=d0f9d6b9b3fef9616026f3ddf11d75fe9f7a41df)", + "multistream-select 0.8.2 (git+https://github.com/sigp/rust-libp2p?rev=03f998022ce2f566a6c6e6c4206bc0ce4d45109f)", + "parity-multiaddr 0.9.1 (git+https://github.com/sigp/rust-libp2p?rev=03f998022ce2f566a6c6e6c4206bc0ce4d45109f)", "parking_lot 0.10.2", "pin-project", "prost", @@ -2718,7 +2718,7 @@ dependencies = [ [[package]] name = "libp2p-core-derive" version = "0.20.2" -source = "git+https://github.com/sigp/rust-libp2p?rev=d0f9d6b9b3fef9616026f3ddf11d75fe9f7a41df#d0f9d6b9b3fef9616026f3ddf11d75fe9f7a41df" +source = "git+https://github.com/sigp/rust-libp2p?rev=03f998022ce2f566a6c6e6c4206bc0ce4d45109f#03f998022ce2f566a6c6e6c4206bc0ce4d45109f" dependencies = [ "quote", "syn", @@ -2727,7 +2727,7 @@ dependencies = [ [[package]] name = "libp2p-dns" version = "0.21.0" -source = "git+https://github.com/sigp/rust-libp2p?rev=d0f9d6b9b3fef9616026f3ddf11d75fe9f7a41df#d0f9d6b9b3fef9616026f3ddf11d75fe9f7a41df" +source = "git+https://github.com/sigp/rust-libp2p?rev=03f998022ce2f566a6c6e6c4206bc0ce4d45109f#03f998022ce2f566a6c6e6c4206bc0ce4d45109f" dependencies = [ "futures 0.3.5", "libp2p-core 0.21.0", @@ -2737,7 +2737,7 @@ dependencies = [ [[package]] name = "libp2p-gossipsub" version = "0.22.0" -source = "git+https://github.com/sigp/rust-libp2p?rev=d0f9d6b9b3fef9616026f3ddf11d75fe9f7a41df#d0f9d6b9b3fef9616026f3ddf11d75fe9f7a41df" +source = "git+https://github.com/sigp/rust-libp2p?rev=03f998022ce2f566a6c6e6c4206bc0ce4d45109f#03f998022ce2f566a6c6e6c4206bc0ce4d45109f" dependencies = [ "base64 0.12.3", "byteorder", @@ -2761,7 +2761,7 @@ dependencies = [ [[package]] name = "libp2p-identify" version = "0.22.0" -source = "git+https://github.com/sigp/rust-libp2p?rev=d0f9d6b9b3fef9616026f3ddf11d75fe9f7a41df#d0f9d6b9b3fef9616026f3ddf11d75fe9f7a41df" +source = "git+https://github.com/sigp/rust-libp2p?rev=03f998022ce2f566a6c6e6c4206bc0ce4d45109f#03f998022ce2f566a6c6e6c4206bc0ce4d45109f" dependencies = [ "futures 0.3.5", "libp2p-core 0.21.0", @@ -2776,7 +2776,7 @@ dependencies = [ [[package]] name = "libp2p-mplex" version = "0.21.0" -source = "git+https://github.com/sigp/rust-libp2p?rev=d0f9d6b9b3fef9616026f3ddf11d75fe9f7a41df#d0f9d6b9b3fef9616026f3ddf11d75fe9f7a41df" +source = "git+https://github.com/sigp/rust-libp2p?rev=03f998022ce2f566a6c6e6c4206bc0ce4d45109f#03f998022ce2f566a6c6e6c4206bc0ce4d45109f" dependencies = [ "bytes 0.5.6", "fnv", @@ -2791,7 +2791,7 @@ dependencies = [ [[package]] name = "libp2p-noise" version = "0.23.0" -source = "git+https://github.com/sigp/rust-libp2p?rev=d0f9d6b9b3fef9616026f3ddf11d75fe9f7a41df#d0f9d6b9b3fef9616026f3ddf11d75fe9f7a41df" +source = "git+https://github.com/sigp/rust-libp2p?rev=03f998022ce2f566a6c6e6c4206bc0ce4d45109f#03f998022ce2f566a6c6e6c4206bc0ce4d45109f" dependencies = [ "bytes 0.5.6", "curve25519-dalek 2.1.0", @@ -2812,7 +2812,7 @@ dependencies = [ [[package]] name = "libp2p-swarm" version = "0.22.0" -source = "git+https://github.com/sigp/rust-libp2p?rev=d0f9d6b9b3fef9616026f3ddf11d75fe9f7a41df#d0f9d6b9b3fef9616026f3ddf11d75fe9f7a41df" +source = "git+https://github.com/sigp/rust-libp2p?rev=03f998022ce2f566a6c6e6c4206bc0ce4d45109f#03f998022ce2f566a6c6e6c4206bc0ce4d45109f" dependencies = [ "either", "futures 0.3.5", @@ -2827,7 +2827,7 @@ dependencies = [ [[package]] name = "libp2p-tcp" version = "0.21.0" -source = "git+https://github.com/sigp/rust-libp2p?rev=d0f9d6b9b3fef9616026f3ddf11d75fe9f7a41df#d0f9d6b9b3fef9616026f3ddf11d75fe9f7a41df" +source = "git+https://github.com/sigp/rust-libp2p?rev=03f998022ce2f566a6c6e6c4206bc0ce4d45109f#03f998022ce2f566a6c6e6c4206bc0ce4d45109f" dependencies = [ "futures 0.3.5", "futures-timer", @@ -2842,7 +2842,7 @@ dependencies = [ [[package]] name = "libp2p-websocket" version = "0.22.0" -source = "git+https://github.com/sigp/rust-libp2p?rev=d0f9d6b9b3fef9616026f3ddf11d75fe9f7a41df#d0f9d6b9b3fef9616026f3ddf11d75fe9f7a41df" +source = "git+https://github.com/sigp/rust-libp2p?rev=03f998022ce2f566a6c6e6c4206bc0ce4d45109f#03f998022ce2f566a6c6e6c4206bc0ce4d45109f" dependencies = [ "async-tls", "either", @@ -3223,7 +3223,7 @@ checksum = "1255076139a83bb467426e7f8d0134968a8118844faa755985e077cf31850333" [[package]] name = "multistream-select" version = "0.8.2" -source = "git+https://github.com/sigp/rust-libp2p?rev=d0f9d6b9b3fef9616026f3ddf11d75fe9f7a41df#d0f9d6b9b3fef9616026f3ddf11d75fe9f7a41df" +source = "git+https://github.com/sigp/rust-libp2p?rev=03f998022ce2f566a6c6e6c4206bc0ce4d45109f#03f998022ce2f566a6c6e6c4206bc0ce4d45109f" dependencies = [ "bytes 0.5.6", "futures 0.3.5", @@ -3517,7 +3517,7 @@ dependencies = [ [[package]] name = "parity-multiaddr" version = "0.9.1" -source = "git+https://github.com/sigp/rust-libp2p?rev=d0f9d6b9b3fef9616026f3ddf11d75fe9f7a41df#d0f9d6b9b3fef9616026f3ddf11d75fe9f7a41df" +source = "git+https://github.com/sigp/rust-libp2p?rev=03f998022ce2f566a6c6e6c4206bc0ce4d45109f#03f998022ce2f566a6c6e6c4206bc0ce4d45109f" dependencies = [ "arrayref", "bs58", diff --git a/beacon_node/eth2_libp2p/Cargo.toml b/beacon_node/eth2_libp2p/Cargo.toml index b7b25a692d..1f4870110f 100644 --- a/beacon_node/eth2_libp2p/Cargo.toml +++ b/beacon_node/eth2_libp2p/Cargo.toml @@ -41,7 +41,7 @@ rand = "0.7.3" [dependencies.libp2p] #version = "0.23.0" git = "https://github.com/sigp/rust-libp2p" -rev = "d0f9d6b9b3fef9616026f3ddf11d75fe9f7a41df" +rev = "03f998022ce2f566a6c6e6c4206bc0ce4d45109f" default-features = false features = ["websocket", "identify", "mplex", "noise", "gossipsub", "dns", "tcp-tokio"] diff --git a/beacon_node/eth2_libp2p/src/behaviour/mod.rs b/beacon_node/eth2_libp2p/src/behaviour/mod.rs index bd8334ec26..0715654497 100644 --- a/beacon_node/eth2_libp2p/src/behaviour/mod.rs +++ b/beacon_node/eth2_libp2p/src/behaviour/mod.rs @@ -249,8 +249,28 @@ impl Behaviour { for topic in message.topics(GossipEncoding::default(), self.enr_fork_id.fork_digest) { match message.encode(GossipEncoding::default()) { Ok(message_data) => { - if let Err(e) = self.gossipsub.publish(topic.into(), message_data) { + if let Err(e) = self.gossipsub.publish(topic.clone().into(), message_data) { slog::warn!(self.log, "Could not publish message"; "error" => format!("{:?}", e)); + + // add to metrics + match topic.kind() { + GossipKind::Attestation(subnet_id) => { + if let Some(v) = metrics::get_int_gauge( + &metrics::FAILED_ATTESTATION_PUBLISHES_PER_SUBNET, + &[&subnet_id.to_string()], + ) { + v.inc() + }; + } + kind => { + if let Some(v) = metrics::get_int_gauge( + &metrics::FAILED_PUBLISHES_PER_MAIN_TOPIC, + &[&format!("{:?}", kind)], + ) { + v.inc() + }; + } + } } } Err(e) => crit!(self.log, "Could not publish message"; "error" => e), @@ -471,23 +491,9 @@ impl Behaviour { } } GossipsubEvent::Subscribed { peer_id, topic } => { - if let Some(topic_metric) = metrics::get_int_gauge( - &metrics::GOSSIPSUB_SUBSCRIBED_PEERS_COUNT, - &[topic.as_str()], - ) { - topic_metric.inc() - } - self.add_event(BehaviourEvent::PeerSubscribed(peer_id, topic)); } - GossipsubEvent::Unsubscribed { peer_id: _, topic } => { - if let Some(topic_metric) = metrics::get_int_gauge( - &metrics::GOSSIPSUB_SUBSCRIBED_PEERS_COUNT, - &[topic.as_str()], - ) { - topic_metric.dec() - } - } + GossipsubEvent::Unsubscribed { .. } => {} } } diff --git a/beacon_node/eth2_libp2p/src/lib.rs b/beacon_node/eth2_libp2p/src/lib.rs index 6792df62d2..6fdbb3d31a 100644 --- a/beacon_node/eth2_libp2p/src/lib.rs +++ b/beacon_node/eth2_libp2p/src/lib.rs @@ -19,7 +19,7 @@ pub use behaviour::{BehaviourEvent, PeerRequestId, Request, Response}; pub use config::Config as NetworkConfig; pub use discovery::{CombinedKeyExt, EnrExt, Eth2Enr}; pub use discv5; -pub use libp2p::gossipsub::{MessageAcceptance, MessageId, Topic, TopicHash}; +pub use libp2p::gossipsub::{Gossipsub, MessageAcceptance, MessageId, Topic, TopicHash}; pub use libp2p::{core::ConnectedPoint, PeerId, Swarm}; pub use libp2p::{multiaddr, Multiaddr}; pub use metrics::scrape_discovery_metrics; diff --git a/beacon_node/eth2_libp2p/src/metrics.rs b/beacon_node/eth2_libp2p/src/metrics.rs index b8677d3055..bffe729220 100644 --- a/beacon_node/eth2_libp2p/src/metrics.rs +++ b/beacon_node/eth2_libp2p/src/metrics.rs @@ -34,9 +34,20 @@ lazy_static! { "Unsolicited discovery requests per ip per second", &["Addresses"] ); - pub static ref GOSSIPSUB_SUBSCRIBED_PEERS_COUNT: Result = try_create_int_gauge_vec( - "gossipsub_peers_per_topic_count", - "Peers subscribed per topic", + pub static ref PEERS_PER_CLIENT: Result = try_create_int_gauge_vec( + "libp2p_peers_per_client", + "The connected peers via client implementation", + &["Client"] + ); + pub static ref FAILED_ATTESTATION_PUBLISHES_PER_SUBNET: Result = + try_create_int_gauge_vec( + "gossipsub_failed_attestation_publishes_per_subnet", + "Failed attestation publishes per subnet", + &["subnet"] + ); + pub static ref FAILED_PUBLISHES_PER_MAIN_TOPIC: Result = try_create_int_gauge_vec( + "gossipsub_failed_publishes_per_main_topic", + "Failed gossip publishes", &["topic_hash"] ); } diff --git a/beacon_node/eth2_libp2p/src/peer_manager/client.rs b/beacon_node/eth2_libp2p/src/peer_manager/client.rs index ba2376c252..f23f32be60 100644 --- a/beacon_node/eth2_libp2p/src/peer_manager/client.rs +++ b/beacon_node/eth2_libp2p/src/peer_manager/client.rs @@ -20,7 +20,7 @@ pub struct Client { pub agent_string: Option, } -#[derive(Clone, Debug, Serialize)] +#[derive(Clone, Debug, Serialize, PartialEq)] pub enum ClientKind { /// A lighthouse node (the best kind). Lighthouse, @@ -98,6 +98,12 @@ impl std::fmt::Display for Client { } } +impl std::fmt::Display for ClientKind { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{:?}", self) + } +} + // helper function to identify clients from their agent_version. Returns the client // kind and it's associated version and the OS kind. fn client_from_agent_version(agent_version: &str) -> (ClientKind, String, String) { diff --git a/beacon_node/eth2_libp2p/src/peer_manager/mod.rs b/beacon_node/eth2_libp2p/src/peer_manager/mod.rs index 861b27faac..2d40f8a0d4 100644 --- a/beacon_node/eth2_libp2p/src/peer_manager/mod.rs +++ b/beacon_node/eth2_libp2p/src/peer_manager/mod.rs @@ -239,6 +239,27 @@ impl PeerManager { /// /// This is also called when dialing a peer fails. pub fn notify_disconnect(&mut self, peer_id: &PeerId) { + // Decrement the PEERS_PER_CLIENT metric + if let Some(kind) = self + .network_globals + .peers + .read() + .peer_info(peer_id) + .and_then(|peer_info| { + if let Connected { .. } = peer_info.connection_status { + Some(peer_info.client.kind.clone()) + } else { + None + } + }) + { + if let Some(v) = + metrics::get_int_gauge(&metrics::PEERS_PER_CLIENT, &[&kind.to_string()]) + { + v.dec() + }; + } + self.network_globals.peers.write().disconnect(peer_id); // remove the ping and status timer for the peer @@ -296,8 +317,25 @@ impl PeerManager { /// Updates `PeerInfo` with `identify` information. pub fn identify(&mut self, peer_id: &PeerId, info: &IdentifyInfo) { if let Some(peer_info) = self.network_globals.peers.write().peer_info_mut(peer_id) { + let previous_kind = peer_info.client.kind.clone(); peer_info.client = client::Client::from_identify_info(info); peer_info.listening_addresses = info.listen_addrs.clone(); + + if previous_kind != peer_info.client.kind { + // update the peer client kind metric + if let Some(v) = metrics::get_int_gauge( + &metrics::PEERS_PER_CLIENT, + &[&peer_info.client.kind.to_string()], + ) { + v.inc() + }; + if let Some(v) = metrics::get_int_gauge( + &metrics::PEERS_PER_CLIENT, + &[&previous_kind.to_string()], + ) { + v.dec() + }; + } } else { crit!(self.log, "Received an Identify response from an unknown peer"; "peer_id" => peer_id.to_string()); } @@ -551,7 +589,10 @@ impl PeerManager { } match connection { - ConnectingType::Dialing => peerdb.dialing_peer(peer_id), + ConnectingType::Dialing => { + peerdb.dialing_peer(peer_id); + return true; + } ConnectingType::IngoingConnected => peerdb.connect_outgoing(peer_id), ConnectingType::OutgoingConnected => peerdb.connect_ingoing(peer_id), } @@ -568,6 +609,21 @@ impl PeerManager { self.network_globals.connected_peers() as i64, ); + // Increment the PEERS_PER_CLIENT metric + if let Some(kind) = self + .network_globals + .peers + .read() + .peer_info(peer_id) + .map(|peer_info| peer_info.client.kind.clone()) + { + if let Some(v) = + metrics::get_int_gauge(&metrics::PEERS_PER_CLIENT, &[&kind.to_string()]) + { + v.inc() + }; + } + true } diff --git a/beacon_node/eth2_libp2p/src/peer_manager/peer_info.rs b/beacon_node/eth2_libp2p/src/peer_manager/peer_info.rs index 7ead3bb26e..2448ee424a 100644 --- a/beacon_node/eth2_libp2p/src/peer_manager/peer_info.rs +++ b/beacon_node/eth2_libp2p/src/peer_manager/peer_info.rs @@ -4,7 +4,7 @@ use super::PeerSyncStatus; use crate::rpc::MetaData; use crate::Multiaddr; use serde::{ - ser::{SerializeStructVariant, Serializer}, + ser::{SerializeStruct, Serializer}, Serialize, }; use std::net::IpAddr; @@ -120,29 +120,51 @@ pub enum PeerConnectionStatus { /// Serialization for http requests. impl Serialize for PeerConnectionStatus { fn serialize(&self, serializer: S) -> Result { + let mut s = serializer.serialize_struct("connection_status", 5)?; match self { Connected { n_in, n_out } => { - let mut s = serializer.serialize_struct_variant("", 0, "Connected", 2)?; - s.serialize_field("in", n_in)?; - s.serialize_field("out", n_out)?; + s.serialize_field("status", "connected")?; + s.serialize_field("connections_in", n_in)?; + s.serialize_field("connections_out", n_out)?; + s.serialize_field("last_seen", &0)?; + s.serialize_field("banned_ips", &Vec::::new())?; s.end() } Disconnected { since } => { - let mut s = serializer.serialize_struct_variant("", 1, "Disconnected", 1)?; - s.serialize_field("since", &since.elapsed().as_secs())?; + s.serialize_field("status", "disconnected")?; + s.serialize_field("connections_in", &0)?; + s.serialize_field("connections_out", &0)?; + s.serialize_field("last_seen", &since.elapsed().as_secs())?; + s.serialize_field("banned_ips", &Vec::::new())?; s.end() } - Banned { since, .. } => { - let mut s = serializer.serialize_struct_variant("", 2, "Banned", 1)?; - s.serialize_field("since", &since.elapsed().as_secs())?; + Banned { + since, + ip_addresses, + } => { + s.serialize_field("status", "banned")?; + s.serialize_field("connections_in", &0)?; + s.serialize_field("connections_out", &0)?; + s.serialize_field("last_seen", &since.elapsed().as_secs())?; + s.serialize_field("banned_ips", &ip_addresses)?; s.end() } Dialing { since } => { - let mut s = serializer.serialize_struct_variant("", 3, "Dialing", 1)?; - s.serialize_field("since", &since.elapsed().as_secs())?; + s.serialize_field("status", "dialing")?; + s.serialize_field("connections_in", &0)?; + s.serialize_field("connections_out", &0)?; + s.serialize_field("last_seen", &since.elapsed().as_secs())?; + s.serialize_field("banned_ips", &Vec::::new())?; + s.end() + } + Unknown => { + s.serialize_field("status", "unknown")?; + s.serialize_field("connections_in", &0)?; + s.serialize_field("connections_out", &0)?; + s.serialize_field("last_seen", &0)?; + s.serialize_field("banned_ips", &Vec::::new())?; s.end() } - Unknown => serializer.serialize_unit_variant("", 4, "Unknown"), } } } diff --git a/beacon_node/network/src/metrics.rs b/beacon_node/network/src/metrics.rs index e178f531d4..e5167a065f 100644 --- a/beacon_node/network/src/metrics.rs +++ b/beacon_node/network/src/metrics.rs @@ -1,36 +1,91 @@ use beacon_chain::attestation_verification::Error as AttnError; pub use lighthouse_metrics::*; +lazy_static! { + + /* + * Gossip subnets and scoring + */ + pub static ref PEERS_PER_PROTOCOL: Result = try_create_int_gauge_vec( + "gossipsub_peers_per_protocol", + "Peers via supported protocol", + &["protocol"] + ); + + pub static ref GOSSIPSUB_SUBSCRIBED_SUBNET_TOPIC: Result = try_create_int_gauge_vec( + "gossipsub_subscribed_subnets", + "Subnets currently subscribed to", + &["subnet"] + ); + + pub static ref GOSSIPSUB_SUBSCRIBED_PEERS_SUBNET_TOPIC: Result = try_create_int_gauge_vec( + "gossipsub_peers_per_subnet_topic_count", + "Peers subscribed per subnet topic", + &["subnet"] + ); + + pub static ref MESH_PEERS_PER_MAIN_TOPIC: Result = try_create_int_gauge_vec( + "gossipsub_mesh_peers_per_main_topic", + "Mesh peers per main topic", + &["topic_hash"] + ); + + pub static ref MESH_PEERS_PER_SUBNET_TOPIC: Result = try_create_int_gauge_vec( + "gossipsub_mesh_peers_per_subnet_topic", + "Mesh peers per subnet topic", + &["subnet"] + ); + + pub static ref AVG_GOSSIPSUB_PEER_SCORE_PER_MAIN_TOPIC: Result = try_create_int_gauge_vec( + "gossipsub_avg_peer_score_per_topic", + "Average peer's score per topic", + &["topic_hash"] + ); + + pub static ref AVG_GOSSIPSUB_PEER_SCORE_PER_SUBNET_TOPIC: Result = try_create_int_gauge_vec( + "gossipsub_avg_peer_score_per_subnet_topic", + "Average peer's score per subnet topic", + &["subnet"] + ); + + pub static ref ATTESTATIONS_PUBLISHED_PER_SUBNET_PER_SLOT: Result = try_create_int_counter_vec( + "gossipsub_attestations_published_per_subnet_per_slot", + "Failed attestation publishes per subnet", + &["subnet"] + ); +} + lazy_static! { /* * Gossip Rx */ pub static ref GOSSIP_BLOCKS_RX: Result = try_create_int_counter( - "network_gossip_blocks_rx_total", + "gossipsub_blocks_rx_total", "Count of gossip blocks received" ); pub static ref GOSSIP_UNAGGREGATED_ATTESTATIONS_RX: Result = try_create_int_counter( - "network_gossip_unaggregated_attestations_rx_total", + "gossipsub_unaggregated_attestations_rx_total", "Count of gossip unaggregated attestations received" ); pub static ref GOSSIP_AGGREGATED_ATTESTATIONS_RX: Result = try_create_int_counter( - "network_gossip_aggregated_attestations_rx_total", + "gossipsub_aggregated_attestations_rx_total", "Count of gossip aggregated attestations received" ); + /* * Gossip Tx */ pub static ref GOSSIP_BLOCKS_TX: Result = try_create_int_counter( - "network_gossip_blocks_tx_total", + "gossipsub_blocks_tx_total", "Count of gossip blocks transmitted" ); pub static ref GOSSIP_UNAGGREGATED_ATTESTATIONS_TX: Result = try_create_int_counter( - "network_gossip_unaggregated_attestations_tx_total", + "gossipsub_unaggregated_attestations_tx_total", "Count of gossip unaggregated attestations transmitted" ); pub static ref GOSSIP_AGGREGATED_ATTESTATIONS_TX: Result = try_create_int_counter( - "network_gossip_aggregated_attestations_tx_total", + "gossipsub_aggregated_attestations_tx_total", "Count of gossip aggregated attestations transmitted" ); @@ -38,11 +93,11 @@ lazy_static! { * Attestation subnet subscriptions */ pub static ref SUBNET_SUBSCRIPTION_REQUESTS: Result = try_create_int_counter( - "network_subnet_subscriptions_total", + "gossipsub_subnet_subscriptions_total", "Count of validator subscription requests." ); pub static ref SUBNET_SUBSCRIPTION_AGGREGATOR_REQUESTS: Result = try_create_int_counter( - "network_subnet_subscriptions_aggregator_total", + "gossipsub_subnet_subscriptions_aggregator_total", "Count of validator subscription requests where the subscriber is an aggregator." ); @@ -194,95 +249,95 @@ lazy_static! { * Attestation Errors */ pub static ref GOSSIP_ATTESTATION_ERROR_FUTURE_EPOCH: Result = try_create_int_counter( - "gossip_attestation_error_future_epoch", + "gossipsub_attestation_error_future_epoch", "Count of a specific error type (see metric name)" ); pub static ref GOSSIP_ATTESTATION_ERROR_PAST_EPOCH: Result = try_create_int_counter( - "gossip_attestation_error_past_epoch", + "gossipsub_attestation_error_past_epoch", "Count of a specific error type (see metric name)" ); pub static ref GOSSIP_ATTESTATION_ERROR_FUTURE_SLOT: Result = try_create_int_counter( - "gossip_attestation_error_future_slot", + "gossipsub_attestation_error_future_slot", "Count of a specific error type (see metric name)" ); pub static ref GOSSIP_ATTESTATION_ERROR_PAST_SLOT: Result = try_create_int_counter( - "gossip_attestation_error_past_slot", + "gossipsub_attestation_error_past_slot", "Count of a specific error type (see metric name)" ); pub static ref GOSSIP_ATTESTATION_ERROR_INVALID_SELECTION_PROOF: Result = try_create_int_counter( - "gossip_attestation_error_invalid_selection_proof", + "gossipsub_attestation_error_invalid_selection_proof", "Count of a specific error type (see metric name)" ); pub static ref GOSSIP_ATTESTATION_ERROR_INVALID_SIGNATURE: Result = try_create_int_counter( - "gossip_attestation_error_invalid_signature", + "gossipsub_attestation_error_invalid_signature", "Count of a specific error type (see metric name)" ); pub static ref GOSSIP_ATTESTATION_ERROR_EMPTY_AGGREGATION_BITFIELD: Result = try_create_int_counter( - "gossip_attestation_error_empty_aggregation_bitfield", + "gossipsub_attestation_error_empty_aggregation_bitfield", "Count of a specific error type (see metric name)" ); pub static ref GOSSIP_ATTESTATION_ERROR_AGGREGATOR_PUBKEY_UNKNOWN: Result = try_create_int_counter( - "gossip_attestation_error_aggregator_pubkey_unknown", + "gossipsub_attestation_error_aggregator_pubkey_unknown", "Count of a specific error type (see metric name)" ); pub static ref GOSSIP_ATTESTATION_ERROR_AGGREGATOR_NOT_IN_COMMITTEE: Result = try_create_int_counter( - "gossip_attestation_error_aggregator_not_in_committee", + "gossipsub_attestation_error_aggregator_not_in_committee", "Count of a specific error type (see metric name)" ); pub static ref GOSSIP_ATTESTATION_ERROR_ATTESTATION_ALREADY_KNOWN: Result = try_create_int_counter( - "gossip_attestation_error_attestation_already_known", + "gossipsub_attestation_error_attestation_already_known", "Count of a specific error type (see metric name)" ); pub static ref GOSSIP_ATTESTATION_ERROR_AGGREGATOR_ALREADY_KNOWN: Result = try_create_int_counter( - "gossip_attestation_error_aggregator_already_known", + "gossipsub_attestation_error_aggregator_already_known", "Count of a specific error type (see metric name)" ); pub static ref GOSSIP_ATTESTATION_ERROR_PRIOR_ATTESTATION_KNOWN: Result = try_create_int_counter( - "gossip_attestation_error_prior_attestation_known", + "gossipsub_attestation_error_prior_attestation_known", "Count of a specific error type (see metric name)" ); pub static ref GOSSIP_ATTESTATION_ERROR_VALIDATOR_INDEX_TOO_HIGH: Result = try_create_int_counter( - "gossip_attestation_error_validator_index_too_high", + "gossipsub_attestation_error_validator_index_too_high", "Count of a specific error type (see metric name)" ); pub static ref GOSSIP_ATTESTATION_ERROR_UNKNOWN_HEAD_BLOCK: Result = try_create_int_counter( - "gossip_attestation_error_unknown_head_block", + "gossipsub_attestation_error_unknown_head_block", "Count of a specific error type (see metric name)" ); pub static ref GOSSIP_ATTESTATION_ERROR_UNKNOWN_TARGET_ROOT: Result = try_create_int_counter( - "gossip_attestation_error_unknown_target_root", + "gossipsub_attestation_error_unknown_target_root", "Count of a specific error type (see metric name)" ); pub static ref GOSSIP_ATTESTATION_ERROR_BAD_TARGET_EPOCH: Result = try_create_int_counter( - "gossip_attestation_error_bad_target_epoch", + "gossipsub_attestation_error_bad_target_epoch", "Count of a specific error type (see metric name)" ); pub static ref GOSSIP_ATTESTATION_ERROR_NO_COMMITTEE_FOR_SLOT_AND_INDEX: Result = try_create_int_counter( - "gossip_attestation_error_no_committee_for_slot_and_index", + "gossipsub_attestation_error_no_committee_for_slot_and_index", "Count of a specific error type (see metric name)" ); pub static ref GOSSIP_ATTESTATION_ERROR_NOT_EXACTLY_ONE_AGGREGATION_BIT_SET: Result = try_create_int_counter( - "gossip_attestation_error_not_exactly_one_aggregation_bit_set", + "gossipsub_attestation_error_not_exactly_one_aggregation_bit_set", "Count of a specific error type (see metric name)" ); pub static ref GOSSIP_ATTESTATION_ERROR_ATTESTS_TO_FUTURE_BLOCK: Result = try_create_int_counter( - "gossip_attestation_error_attests_to_future_block", + "gossipsub_attestation_error_attests_to_future_block", "Count of a specific error type (see metric name)" ); pub static ref GOSSIP_ATTESTATION_ERROR_INVALID_SUBNET_ID: Result = try_create_int_counter( - "gossip_attestation_error_invalid_subnet_id", + "gossipsub_attestation_error_invalid_subnet_id", "Count of a specific error type (see metric name)" ); pub static ref GOSSIP_ATTESTATION_ERROR_INVALID_STATE_PROCESSING: Result = try_create_int_counter( - "gossip_attestation_error_invalid_state_processing", + "gossipsub_attestation_error_invalid_state_processing", "Count of a specific error type (see metric name)" ); pub static ref GOSSIP_ATTESTATION_ERROR_INVALID_TOO_MANY_SKIPPED_SLOTS: Result = try_create_int_counter( - "gossip_attestation_error_invalid_too_many_skipped_slots", + "gossipsub_attestation_error_invalid_too_many_skipped_slots", "Count of a specific error type (see metric name)" ); pub static ref GOSSIP_ATTESTATION_ERROR_BEACON_CHAIN_ERROR: Result = try_create_int_counter( - "gossip_attestation_error_beacon_chain_error", + "gossipsub_attestation_error_beacon_chain_error", "Count of a specific error type (see metric name)" ); } diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index 55406f56a6..e07ebb166a 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -10,13 +10,14 @@ use eth2_libp2p::{ rpc::{GoodbyeReason, RPCResponseErrorCode, RequestId}, Libp2pEvent, PeerAction, PeerRequestId, PubsubMessage, Request, Response, }; -use eth2_libp2p::{BehaviourEvent, MessageId, NetworkGlobals, PeerId}; +use eth2_libp2p::{ + types::GossipKind, BehaviourEvent, GossipTopic, MessageId, NetworkGlobals, PeerId, TopicHash, +}; use eth2_libp2p::{MessageAcceptance, Service as LibP2PService}; use futures::prelude::*; use rest_types::ValidatorSubscription; use slog::{debug, error, info, o, trace, warn}; -use std::sync::Arc; -use std::time::Duration; +use std::{collections::HashMap, sync::Arc, time::Duration}; use store::HotColdDB; use tokio::sync::mpsc; use tokio::time::Delay; @@ -24,6 +25,9 @@ use types::EthSpec; mod tests; +/// The interval (in seconds) that various network metrics will update. +const METRIC_UPDATE_INTERVAL: u64 = 1; + /// Types of messages that the network service can receive. #[derive(Debug)] pub enum NetworkMessage { @@ -91,6 +95,8 @@ pub struct NetworkService { network_globals: Arc>, /// A delay that expires when a new fork takes place. next_fork_update: Option, + /// A timer for updating various network metrics. + metrics_update: tokio::time::Interval, /// The logger for the network service. log: slog::Logger, } @@ -146,6 +152,9 @@ impl NetworkService { let attestation_service = AttestationService::new(beacon_chain.clone(), network_globals.clone(), &network_log); + // create a timer for updating network metrics + let metrics_update = tokio::time::interval(Duration::from_secs(METRIC_UPDATE_INTERVAL)); + // create the network service and spawn the task let network_log = network_log.new(o!("service" => "network")); let network_service = NetworkService { @@ -157,6 +166,7 @@ impl NetworkService { store, network_globals: network_globals.clone(), next_fork_update, + metrics_update, log: network_log, }; @@ -175,9 +185,8 @@ fn spawn_service( // spawn on the current executor executor.spawn_without_exit(async move { - // TODO: there is something with this code that prevents cargo fmt from doing anything at - // all. Ok, it is worse, the compiler doesn't show errors over this code beyond ast - // checking + + let mut metric_update_counter = 0; loop { // build the futures to check simultaneously tokio::select! { @@ -206,6 +215,17 @@ fn spawn_service( info!(service.log, "Network service shutdown"); return; } + _ = service.metrics_update.next() => { + // update various network metrics + metric_update_counter +=1; + if metric_update_counter* 1000 % T::EthSpec::default_spec().milliseconds_per_slot == 0 { + // if a slot has occurred, reset the metrics + let _ = metrics::ATTESTATIONS_PUBLISHED_PER_SUBNET_PER_SLOT + .as_ref() + .map(|gauge| gauge.reset()); + } + update_gossip_metrics::(&service.libp2p.swarm.gs()); + } // handle a message sent to the network Some(message) = service.network_recv.recv() => { match message { @@ -424,7 +444,11 @@ fn expose_publish_metrics(messages: &[PubsubMessage]) { for message in messages { match message { PubsubMessage::BeaconBlock(_) => metrics::inc_counter(&metrics::GOSSIP_BLOCKS_TX), - PubsubMessage::Attestation(_) => { + PubsubMessage::Attestation(subnet_id) => { + metrics::inc_counter_vec( + &metrics::ATTESTATIONS_PUBLISHED_PER_SUBNET_PER_SLOT, + &[&subnet_id.0.to_string()], + ); metrics::inc_counter(&metrics::GOSSIP_UNAGGREGATED_ATTESTATIONS_TX) } PubsubMessage::AggregateAndProofAttestation(_) => { @@ -448,3 +472,163 @@ fn expose_receive_metrics(message: &PubsubMessage) { _ => {} } } + +fn update_gossip_metrics(gossipsub: ð2_libp2p::Gossipsub) { + // Clear the metrics + let _ = metrics::PEERS_PER_PROTOCOL + .as_ref() + .map(|gauge| gauge.reset()); + let _ = metrics::PEERS_PER_PROTOCOL + .as_ref() + .map(|gauge| gauge.reset()); + let _ = metrics::MESH_PEERS_PER_MAIN_TOPIC + .as_ref() + .map(|gauge| gauge.reset()); + let _ = metrics::AVG_GOSSIPSUB_PEER_SCORE_PER_MAIN_TOPIC + .as_ref() + .map(|gauge| gauge.reset()); + let _ = metrics::AVG_GOSSIPSUB_PEER_SCORE_PER_SUBNET_TOPIC + .as_ref() + .map(|gauge| gauge.reset()); + + // reset the mesh peers, showing all subnets + for subnet_id in 0..T::default_spec().attestation_subnet_count { + let _ = metrics::get_int_gauge( + &metrics::MESH_PEERS_PER_SUBNET_TOPIC, + &[&subnet_id.to_string()], + ) + .map(|v| v.set(0)); + + let _ = metrics::get_int_gauge( + &metrics::GOSSIPSUB_SUBSCRIBED_SUBNET_TOPIC, + &[&subnet_id.to_string()], + ) + .map(|v| v.set(0)); + + let _ = metrics::get_int_gauge( + &metrics::GOSSIPSUB_SUBSCRIBED_PEERS_SUBNET_TOPIC, + &[&subnet_id.to_string()], + ) + .map(|v| v.set(0)); + } + + // Subnet topics subscribed to + for topic_hash in gossipsub.topics() { + if let Ok(topic) = GossipTopic::decode(topic_hash.as_str()) { + if let GossipKind::Attestation(subnet_id) = topic.kind() { + let _ = metrics::get_int_gauge( + &metrics::GOSSIPSUB_SUBSCRIBED_SUBNET_TOPIC, + &[&subnet_id.to_string()], + ) + .map(|v| v.set(1)); + } + } + } + + // Peers per subscribed subnet + let mut peers_per_topic: HashMap = HashMap::new(); + for (peer_id, topics) in gossipsub.all_peers() { + for topic_hash in topics { + *peers_per_topic.entry(topic_hash.clone()).or_default() += 1; + + if let Ok(topic) = GossipTopic::decode(topic_hash.as_str()) { + match topic.kind() { + GossipKind::Attestation(subnet_id) => { + if let Some(v) = metrics::get_int_gauge( + &metrics::GOSSIPSUB_SUBSCRIBED_PEERS_SUBNET_TOPIC, + &[&subnet_id.to_string()], + ) { + v.inc() + }; + + // average peer scores + if let Some(score) = gossipsub.peer_score(peer_id) { + if let Some(v) = metrics::get_int_gauge( + &metrics::AVG_GOSSIPSUB_PEER_SCORE_PER_SUBNET_TOPIC, + &[&subnet_id.to_string()], + ) { + v.add(score as i64) + }; + } + } + kind => { + // main topics + if let Some(score) = gossipsub.peer_score(peer_id) { + if let Some(v) = metrics::get_int_gauge( + &metrics::AVG_GOSSIPSUB_PEER_SCORE_PER_MAIN_TOPIC, + &[&format!("{:?}", kind)], + ) { + v.add(score as i64) + }; + } + } + } + } + } + } + // adjust to average scores by dividing by number of peers + for (topic_hash, peers) in peers_per_topic.iter() { + if let Ok(topic) = GossipTopic::decode(topic_hash.as_str()) { + match topic.kind() { + GossipKind::Attestation(subnet_id) => { + // average peer scores + if let Some(v) = metrics::get_int_gauge( + &metrics::AVG_GOSSIPSUB_PEER_SCORE_PER_SUBNET_TOPIC, + &[&subnet_id.to_string()], + ) { + v.set(v.get() / (*peers as i64)) + }; + } + kind => { + // main topics + if let Some(v) = metrics::get_int_gauge( + &metrics::AVG_GOSSIPSUB_PEER_SCORE_PER_MAIN_TOPIC, + &[&format!("{:?}", kind)], + ) { + v.set(v.get() / (*peers as i64)) + }; + } + } + } + } + + // mesh peers + for topic_hash in gossipsub.topics() { + let peers = gossipsub.mesh_peers(&topic_hash).count(); + if let Ok(topic) = GossipTopic::decode(topic_hash.as_str()) { + match topic.kind() { + GossipKind::Attestation(subnet_id) => { + if let Some(v) = metrics::get_int_gauge( + &metrics::MESH_PEERS_PER_SUBNET_TOPIC, + &[&subnet_id.to_string()], + ) { + v.set(peers as i64) + }; + } + kind => { + // main topics + if let Some(v) = metrics::get_int_gauge( + &metrics::MESH_PEERS_PER_MAIN_TOPIC, + &[&format!("{:?}", kind)], + ) { + v.set(peers as i64) + }; + } + } + } + } + + // protocol peers + let mut peers_per_protocol: HashMap = HashMap::new(); + for (_peer, protocol) in gossipsub.peer_protocol() { + *peers_per_protocol.entry(protocol.to_string()).or_default() += 1; + } + + for (protocol, peers) in peers_per_protocol.iter() { + if let Some(v) = + metrics::get_int_gauge(&metrics::PEERS_PER_PROTOCOL, &[&protocol.to_string()]) + { + v.set(*peers) + }; + } +} diff --git a/common/lighthouse_metrics/src/lib.rs b/common/lighthouse_metrics/src/lib.rs index d785e0b566..0a4251e06d 100644 --- a/common/lighthouse_metrics/src/lib.rs +++ b/common/lighthouse_metrics/src/lib.rs @@ -163,6 +163,22 @@ pub fn get_int_gauge(int_gauge_vec: &Result, name: &[&str]) -> Opti } } +/// If `int_gauge_vec.is_ok()`, sets the gauge with the given `name` to the given `value` +/// otherwise returns false. +pub fn set_int_gauge(int_gauge_vec: &Result, name: &[&str], value: i64) -> bool { + if let Ok(int_gauge_vec) = int_gauge_vec { + int_gauge_vec + .get_metric_with_label_values(name) + .map(|v| { + v.set(value); + true + }) + .unwrap_or_else(|_| false) + } else { + false + } +} + /// If `int_counter_vec.is_ok()`, returns a counter with the given `name`. pub fn get_int_counter( int_counter_vec: &Result,