Update to latest master

This commit is contained in:
Age Manning
2019-08-25 09:06:26 +10:00
56 changed files with 1751 additions and 944 deletions

View File

@@ -78,6 +78,10 @@ impl<TSubstream: AsyncRead + AsyncWrite> Behaviour<TSubstream> {
log: behaviour_log,
})
}
pub fn discovery(&self) -> &Discovery<TSubstream> {
&self.discovery
}
}
// Implement the NetworkBehaviourEventProcess trait so that we can derive NetworkBehaviour for Behaviour
@@ -87,7 +91,7 @@ impl<TSubstream: AsyncRead + AsyncWrite> NetworkBehaviourEventProcess<GossipsubE
fn inject_event(&mut self, event: GossipsubEvent) {
match event {
GossipsubEvent::Message(gs_msg) => {
trace!(self.log, "Received GossipEvent"; "msg" => format!("{:?}", gs_msg));
trace!(self.log, "Received GossipEvent");
let msg = PubsubMessage::from_topics(&gs_msg.topics, gs_msg.data);

View File

@@ -1,3 +1,4 @@
use crate::metrics;
use crate::{error, NetworkConfig};
/// This manages the discovery and management of peers.
///
@@ -102,6 +103,10 @@ impl<TSubstream> Discovery<TSubstream> {
})
}
pub fn local_enr(&self) -> &Enr {
self.discovery.local_enr()
}
/// Manually search for peers. This restarts the discovery round, sparking multiple rapid
/// queries.
pub fn discover_peers(&mut self) {
@@ -119,6 +124,11 @@ impl<TSubstream> Discovery<TSubstream> {
self.connected_peers.len()
}
/// The current number of connected libp2p peers.
pub fn connected_peer_set(&self) -> &HashSet<PeerId> {
&self.connected_peers
}
/// Search for new peers using the underlying discovery mechanism.
fn find_peers(&mut self) {
// pick a random NodeId
@@ -159,10 +169,16 @@ where
fn inject_connected(&mut self, peer_id: PeerId, _endpoint: ConnectedPoint) {
self.connected_peers.insert(peer_id);
metrics::inc_counter(&metrics::PEER_CONNECT_EVENT_COUNT);
metrics::set_gauge(&metrics::PEERS_CONNECTED, self.connected_peers() as i64);
}
fn inject_disconnected(&mut self, peer_id: &PeerId, _endpoint: ConnectedPoint) {
self.connected_peers.remove(peer_id);
metrics::inc_counter(&metrics::PEER_DISCONNECT_EVENT_COUNT);
metrics::set_gauge(&metrics::PEERS_CONNECTED, self.connected_peers() as i64);
}
fn inject_replaced(
@@ -217,6 +233,7 @@ where
}
Discv5Event::SocketUpdated(socket) => {
info!(self.log, "Address updated"; "IP" => format!("{}",socket.ip()));
metrics::inc_counter(&metrics::ADDRESS_UPDATE_COUNT);
let mut address = Multiaddr::from(socket.ip());
address.push(Protocol::Tcp(self.tcp_port));
let enr = self.discovery.local_enr();

View File

@@ -2,21 +2,29 @@
/// all required libp2p functionality.
///
/// This crate builds and manages the libp2p services required by the beacon node.
#[macro_use]
extern crate lazy_static;
pub mod behaviour;
mod config;
mod discovery;
pub mod error;
mod metrics;
pub mod rpc;
mod service;
pub use behaviour::PubsubMessage;
pub use config::{Config as NetworkConfig, *};
pub use config::{
Config as NetworkConfig, BEACON_ATTESTATION_TOPIC, BEACON_BLOCK_TOPIC, SHARD_TOPIC_PREFIX,
TOPIC_ENCODING_POSTFIX, TOPIC_PREFIX,
};
pub use libp2p::enr::Enr;
pub use libp2p::gossipsub::{Topic, TopicHash};
pub use libp2p::multiaddr;
pub use libp2p::Multiaddr;
pub use libp2p::{
gossipsub::{GossipsubConfig, GossipsubConfigBuilder},
PeerId,
PeerId, Swarm,
};
pub use rpc::RPCEvent;
pub use service::Libp2pEvent;

View File

@@ -0,0 +1,20 @@
pub use lighthouse_metrics::*;
lazy_static! {
pub static ref ADDRESS_UPDATE_COUNT: Result<IntCounter> = try_create_int_counter(
"libp2p_address_update_total",
"Count of libp2p socked updated events (when our view of our IP address has changed)"
);
pub static ref PEERS_CONNECTED: Result<IntGauge> = try_create_int_gauge(
"libp2p_peer_connected_peers_total",
"Count of libp2p peers currently connected"
);
pub static ref PEER_CONNECT_EVENT_COUNT: Result<IntCounter> = try_create_int_counter(
"libp2p_peer_connect_event_total",
"Count of libp2p peer connect events (not the current number of connected peers)"
);
pub static ref PEER_DISCONNECT_EVENT_COUNT: Result<IntCounter> = try_create_int_counter(
"libp2p_peer_disconnect_event_total",
"Count of libp2p peer disconnect events"
);
}

View File

@@ -16,7 +16,7 @@ use libp2p::core::{
upgrade::{InboundUpgradeExt, OutboundUpgradeExt},
};
use libp2p::{core, secio, PeerId, Swarm, Transport};
use slog::{debug, info, trace, warn};
use slog::{crit, debug, info, trace, warn};
use std::fs::File;
use std::io::prelude::*;
use std::io::{Error, ErrorKind};
@@ -33,7 +33,7 @@ pub struct Service {
//TODO: Make this private
pub swarm: Swarm<Libp2pStream, Libp2pBehaviour>,
/// This node's PeerId.
_local_peer_id: PeerId,
pub local_peer_id: PeerId,
/// The libp2p logger handle.
pub log: slog::Logger,
}
@@ -68,10 +68,15 @@ impl Service {
log_address.push(Protocol::P2p(local_peer_id.clone().into()));
info!(log, "Listening established"; "Address" => format!("{}", log_address));
}
Err(err) => warn!(
log,
"Failed to listen on address"; "Address" => format!("{}", listen_multiaddr), "Error" => format!("{:?}", err)
),
Err(err) => {
crit!(
log,
"Unable to listen on libp2p address";
"error" => format!("{:?}", err),
"listen_multiaddr" => format!("{}", listen_multiaddr),
);
return Err("Libp2p was unable to listen on the given listen address.".into());
}
};
// attempt to connect to user-input libp2p nodes
@@ -126,7 +131,7 @@ impl Service {
info!(log, "Subscribed to topics"; "Topics" => format!("{:?}", subscribed_topics.iter().map(|t| format!("{}", t)).collect::<Vec<String>>()));
Ok(Service {
_local_peer_id: local_peer_id,
local_peer_id,
swarm,
log,
})