Adds ENR "eth2" field and Fork logic to networking (#953)

* Merge #913

* Correct release tests

* Completed release test corrections

* Initial work on upgrading discovery

* Updates discovery to latest version

* Update ENR initialisation logic

* Remove debug statements

* Shifts timing units to slots

* Initial work

* Add initial fork versioning and EnrForkId

* Correct linking for EnrForkId

* Adds eth2 field to local ENR

* Initial work to eth2 field integration

* Integrate eth2 field into discovery

* temp commit

* Add a timer to adjust fork versions during a hard fork for the ENR
This commit is contained in:
Age Manning
2020-03-24 21:45:53 +11:00
committed by GitHub
parent af1c5c326c
commit 58111cddb2
17 changed files with 431 additions and 235 deletions

View File

@@ -15,7 +15,7 @@ use libp2p::{
use lru::LruCache;
use slog::{debug, o, warn};
use std::sync::Arc;
use types::EthSpec;
use types::{EnrForkId, EthSpec};
const MAX_IDENTIFY_ADDRESSES: usize = 20;
@@ -87,6 +87,96 @@ impl<TSubstream: AsyncRead + AsyncWrite, TSpec: EthSpec> Behaviour<TSubstream, T
}
}
/// Implements the combined behaviour for the libp2p service.
impl<TSubstream: AsyncRead + AsyncWrite, TSpec: EthSpec> Behaviour<TSubstream, TSpec> {
/* Pubsub behaviour functions */
/// Subscribes to a gossipsub topic.
pub fn subscribe(&mut self, topic: GossipTopic) -> bool {
if !self
.network_globals
.gossipsub_subscriptions
.read()
.contains(&topic)
{
self.network_globals
.gossipsub_subscriptions
.write()
.push(topic.clone());
}
self.gossipsub.subscribe(topic.into())
}
/// Unsubscribe from a gossipsub topic.
pub fn unsubscribe(&mut self, topic: GossipTopic) -> bool {
let pos = self
.network_globals
.gossipsub_subscriptions
.read()
.iter()
.position(|s| s == &topic);
if let Some(pos) = pos {
self.network_globals
.gossipsub_subscriptions
.write()
.swap_remove(pos);
}
self.gossipsub.unsubscribe(topic.into())
}
/// Publishes a list of messages on the pubsub (gossipsub) behaviour, choosing the encoding.
pub fn publish(&mut self, messages: Vec<PubsubMessage<TSpec>>) {
for message in messages {
for topic in message.topics() {
let message_data = message.encode();
self.gossipsub.publish(&topic.into(), message_data);
}
}
}
/// Forwards a message that is waiting in gossipsub's mcache. Messages are only propagated
/// once validated by the beacon chain.
pub fn propagate_message(&mut self, propagation_source: &PeerId, message_id: MessageId) {
self.gossipsub
.propagate_message(&message_id, propagation_source);
}
/* Eth2 RPC behaviour functions */
/// Sends an RPC Request/Response via the RPC protocol.
pub fn send_rpc(&mut self, peer_id: PeerId, rpc_event: RPCEvent<TSpec>) {
self.eth2_rpc.send_rpc(peer_id, rpc_event);
}
/* Discovery / Peer management functions */
/// Notify discovery that the peer has been banned.
pub fn peer_banned(&mut self, peer_id: PeerId) {
self.discovery.peer_banned(peer_id);
}
/// Notify discovery that the peer has been unbanned.
pub fn peer_unbanned(&mut self, peer_id: &PeerId) {
self.discovery.peer_unbanned(peer_id);
}
/// Returns an iterator over all enr entries in the DHT.
pub fn enr_entries(&mut self) -> impl Iterator<Item = &Enr> {
self.discovery.enr_entries()
}
/// Add an ENR to the routing table of the discovery mechanism.
pub fn add_enr(&mut self, enr: Enr) {
self.discovery.add_enr(enr);
}
/// Updates the local ENR's "eth2" field with the latest EnrForkId.
pub fn update_fork_version(&mut self, enr_fork_id: EnrForkId) {
self.discovery.update_eth2_enr(enr_fork_id);
// TODO: Handle gossipsub fork update
}
}
// Implement the NetworkBehaviourEventProcess trait so that we can derive NetworkBehaviour for Behaviour
impl<TSubstream: AsyncRead + AsyncWrite, TSpec: EthSpec>
NetworkBehaviourEventProcess<GossipsubEvent> for Behaviour<TSubstream, TSpec>
@@ -194,95 +284,6 @@ impl<TSubstream: AsyncRead + AsyncWrite, TSpec: EthSpec> NetworkBehaviourEventPr
}
}
/// Implements the combined behaviour for the libp2p service.
impl<TSubstream: AsyncRead + AsyncWrite, TSpec: EthSpec> Behaviour<TSubstream, TSpec> {
/* Pubsub behaviour functions */
/// Subscribes to a gossipsub topic.
pub fn subscribe(&mut self, topic: GossipTopic) -> bool {
if !self
.network_globals
.gossipsub_subscriptions
.read()
.contains(&topic)
{
self.network_globals
.gossipsub_subscriptions
.write()
.push(topic.clone());
}
self.gossipsub.subscribe(topic.into())
}
/// Unsubscribe from a gossipsub topic.
pub fn unsubscribe(&mut self, topic: GossipTopic) -> bool {
let pos = self
.network_globals
.gossipsub_subscriptions
.read()
.iter()
.position(|s| s == &topic);
if let Some(pos) = pos {
self.network_globals
.gossipsub_subscriptions
.write()
.swap_remove(pos);
}
self.gossipsub.unsubscribe(topic.into())
}
/// Publishes a list of messages on the pubsub (gossipsub) behaviour, choosing the encoding.
pub fn publish(&mut self, messages: Vec<PubsubMessage<TSpec>>) {
for message in messages {
for topic in message.topics() {
let message_data = message.encode();
self.gossipsub.publish(&topic.into(), message_data);
}
}
}
/// Forwards a message that is waiting in gossipsub's mcache. Messages are only propagated
/// once validated by the beacon chain.
pub fn propagate_message(&mut self, propagation_source: &PeerId, message_id: MessageId) {
self.gossipsub
.propagate_message(&message_id, propagation_source);
}
/* Eth2 RPC behaviour functions */
/// Sends an RPC Request/Response via the RPC protocol.
pub fn send_rpc(&mut self, peer_id: PeerId, rpc_event: RPCEvent<TSpec>) {
self.eth2_rpc.send_rpc(peer_id, rpc_event);
}
/* Discovery / Peer management functions */
/// The current number of connected libp2p peers.
pub fn connected_peers(&self) -> usize {
self.network_globals.connected_peers()
}
/// Notify discovery that the peer has been banned.
pub fn peer_banned(&mut self, peer_id: PeerId) {
self.discovery.peer_banned(peer_id);
}
/// Notify discovery that the peer has been unbanned.
pub fn peer_unbanned(&mut self, peer_id: &PeerId) {
self.discovery.peer_unbanned(peer_id);
}
/// Returns an iterator over all enr entries in the DHT.
pub fn enr_entries(&mut self) -> impl Iterator<Item = &Enr> {
self.discovery.enr_entries()
}
/// Add an ENR to the routing table of the discovery mechanism.
pub fn add_enr(&mut self, enr: Enr) {
self.discovery.add_enr(enr);
}
}
/// The types of events than can be obtained from polling the behaviour.
pub enum BehaviourEvent<TSpec: EthSpec> {
/// A received RPC event and the peer that it was received from.

View File

@@ -7,6 +7,7 @@ use serde_derive::{Deserialize, Serialize};
use sha2::{Digest, Sha256};
use std::path::PathBuf;
use std::time::Duration;
use types::EnrForkId;
#[derive(Clone, Debug, Serialize, Deserialize)]
#[serde(default)]
@@ -63,6 +64,9 @@ pub struct Config {
/// List of extra topics to initially subscribe to as strings.
pub topics: Vec<GossipTopic>,
/// The initial ENR fork id.
pub enr_fork_id: EnrForkId,
/// Introduces randomization in network propagation of messages. This should only be set for
/// testing purposes and will likely be removed in future versions.
// TODO: Remove this functionality for mainnet
@@ -132,6 +136,7 @@ impl Default for Config {
libp2p_nodes: vec![],
client_version: version::version(),
topics,
enr_fork_id: EnrForkId::default(),
propagation_percentage: None,
}
}

View File

@@ -0,0 +1,121 @@
use super::ENR_FILENAME;
use crate::Enr;
use crate::NetworkConfig;
use libp2p::core::identity::Keypair;
use libp2p::discv5::enr::{CombinedKey, EnrBuilder};
use slog::{debug, warn};
use ssz::Encode;
use std::convert::TryInto;
use std::fs::File;
use std::io::prelude::*;
use std::path::Path;
use std::str::FromStr;
/// Loads an ENR from file if it exists and matches the current NodeId and sequence number. If none
/// exists, generates a new one.
///
/// If an ENR exists, with the same NodeId, this function checks to see if the loaded ENR from
/// disk is suitable to use, otherwise we increment our newly generated ENR's sequence number.
pub fn build_or_load_enr(
local_key: Keypair,
config: &NetworkConfig,
log: &slog::Logger,
) -> Result<Enr, String> {
// Build the local ENR.
// Note: Discovery should update the ENR record's IP to the external IP as seen by the
// majority of our peers, if the CLI doesn't expressly forbid it.
let enr_key: CombinedKey = local_key
.try_into()
.map_err(|_| "Invalid key type for ENR records")?;
let mut local_enr = build_enr(&enr_key, config)?;
let enr_f = config.network_dir.join(ENR_FILENAME);
if let Ok(mut enr_file) = File::open(enr_f.clone()) {
let mut enr_string = String::new();
match enr_file.read_to_string(&mut enr_string) {
Err(_) => debug!(log, "Could not read ENR from file"),
Ok(_) => {
match Enr::from_str(&enr_string) {
Ok(disk_enr) => {
// if the same node id, then we may need to update our sequence number
if local_enr.node_id() == disk_enr.node_id() {
if compare_enr(&local_enr, &disk_enr) {
debug!(log, "ENR loaded from disk"; "file" => format!("{:?}", enr_f));
// the stored ENR has the same configuration, use it
return Ok(disk_enr);
}
// same node id, different configuration - update the sequence number
let new_seq_no = disk_enr.seq().checked_add(1).ok_or_else(|| "ENR sequence number on file is too large. Remove it to generate a new NodeId")?;
local_enr.set_seq(new_seq_no, &enr_key).map_err(|e| {
format!("Could not update ENR sequence number: {:?}", e)
})?;
debug!(log, "ENR sequence number increased"; "seq" => new_seq_no);
}
}
Err(e) => {
warn!(log, "ENR from file could not be decoded"; "error" => format!("{:?}", e));
}
}
}
}
}
save_enr_to_disk(&config.network_dir, &local_enr, log);
Ok(local_enr)
}
/// Builds a lighthouse ENR given a `NetworkConfig`.
fn build_enr(enr_key: &CombinedKey, config: &NetworkConfig) -> Result<Enr, String> {
let mut builder = EnrBuilder::new("v4");
if let Some(enr_address) = config.enr_address {
builder.ip(enr_address);
}
if let Some(udp_port) = config.enr_udp_port {
builder.udp(udp_port);
}
// we always give it our listening tcp port
// TODO: Add uPnP support to map udp and tcp ports
let tcp_port = config.enr_tcp_port.unwrap_or_else(|| config.libp2p_port);
builder.tcp(tcp_port);
// set the `eth2` field on our ENR
builder.add_value("eth2".into(), config.enr_fork_id.as_ssz_bytes());
builder
.tcp(config.libp2p_port)
.build(enr_key)
.map_err(|e| format!("Could not build Local ENR: {:?}", e))
}
/// Defines the conditions under which we use the locally built ENR or the one stored on disk.
/// If this function returns true, we use the `disk_enr`.
fn compare_enr(local_enr: &Enr, disk_enr: &Enr) -> bool {
// take preference over disk_enr address if one is not specified
(local_enr.ip().is_none() || local_enr.ip() == disk_enr.ip())
// tcp ports must match
&& local_enr.tcp() == disk_enr.tcp()
&& local_enr.get("eth2") == disk_enr.get("eth2")
// take preference over disk udp port if one is not specified
&& (local_enr.udp().is_none() || local_enr.udp() == disk_enr.udp())
}
/// Saves an ENR to disk
pub fn save_enr_to_disk(dir: &Path, enr: &Enr, log: &slog::Logger) {
let _ = std::fs::create_dir_all(dir);
match File::create(dir.join(Path::new(ENR_FILENAME)))
.and_then(|mut f| f.write_all(&enr.to_base64().as_bytes()))
{
Ok(_) => {
debug!(log, "ENR written to disk");
}
Err(e) => {
warn!(
log,
"Could not write ENR to file"; "file" => format!("{:?}{:?}",dir, ENR_FILENAME), "error" => format!("{}", e)
);
}
}
}

View File

@@ -1,29 +1,25 @@
///! This manages the discovery and management of peers.
mod enr_helpers;
use crate::metrics;
use crate::Enr;
use crate::{error, NetworkConfig, NetworkGlobals, PeerInfo};
/// This manages the discovery and management of peers.
///
/// Currently using discv5 for peer discovery.
///
use futures::prelude::*;
use libp2p::core::{identity::Keypair, ConnectedPoint, Multiaddr, PeerId};
use libp2p::discv5::enr::{CombinedKey, EnrBuilder, NodeId};
use libp2p::discv5::enr::NodeId;
use libp2p::discv5::{Discv5, Discv5Event};
use libp2p::multiaddr::Protocol;
use libp2p::swarm::{NetworkBehaviour, NetworkBehaviourAction, PollParameters, ProtocolsHandler};
use slog::{debug, info, warn};
use ssz::Encode;
use std::collections::HashSet;
use std::convert::TryInto;
use std::fs::File;
use std::io::prelude::*;
use std::net::SocketAddr;
use std::path::Path;
use std::str::FromStr;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::timer::Delay;
use types::EthSpec;
use types::{EnrForkId, EthSpec};
/// Maximum seconds before searching for extra peers.
const MAX_TIME_BETWEEN_PEER_SEARCHES: u64 = 120;
@@ -76,7 +72,7 @@ impl<TSubstream, TSpec: EthSpec> Discovery<TSubstream, TSpec> {
let log = log.clone();
// checks if current ENR matches that found on disk
let local_enr = load_enr(local_key.clone(), config, &log)?;
let local_enr = enr_helpers::build_or_load_enr(local_key.clone(), config, &log)?;
*network_globals.local_enr.write() = Some(local_enr.clone());
@@ -105,7 +101,13 @@ impl<TSubstream, TSpec: EthSpec> Discovery<TSubstream, TSpec> {
"node_id" => format!("{}", bootnode_enr.node_id()),
"peer_id" => format!("{}", bootnode_enr.peer_id())
);
discovery.add_enr(bootnode_enr);
let _ = discovery.add_enr(bootnode_enr).map_err(|e| {
warn!(
log,
"Could not add peer to the local routing table";
"error" => format!("{}", e)
)
});
}
Ok(Self {
@@ -135,7 +137,13 @@ impl<TSubstream, TSpec: EthSpec> Discovery<TSubstream, TSpec> {
/// Add an ENR to the routing table of the discovery mechanism.
pub fn add_enr(&mut self, enr: Enr) {
self.discovery.add_enr(enr);
let _ = self.discovery.add_enr(enr).map_err(|e| {
warn!(
self.log,
"Could not add peer to the local routing table";
"error" => format!("{}", e)
)
});
}
/// The peer has been banned. Add this peer to the banned list to prevent any future
@@ -154,6 +162,34 @@ impl<TSubstream, TSpec: EthSpec> Discovery<TSubstream, TSpec> {
self.discovery.enr_entries()
}
/// Updates the `eth2` field of our local ENR.
pub fn update_eth2_enr(&mut self, enr_fork_id: EnrForkId) {
// to avoid having a reference to the spec constant, for the logging we assume
// FAR_FUTURE_EPOCH is u64::max_value()
let next_fork_epoch_log = if enr_fork_id.next_fork_epoch == u64::max_value() {
String::from("No other fork")
} else {
format!("{:?}", enr_fork_id.next_fork_epoch)
};
info!(self.log, "Updating the ENR fork version";
"fork_digest" => format!("{:?}", enr_fork_id.fork_digest),
"next_fork_version" => format!("{:?}", enr_fork_id.next_fork_version),
"next_fork_epoch" => next_fork_epoch_log,
);
let _ = self
.discovery
.enr_insert("eth2".into(), enr_fork_id.as_ssz_bytes())
.map_err(|e| {
warn!(
self.log,
"Could not update eth2 ENR field";
"error" => format!("{:?}", e)
)
});
}
/// Search for new peers using the underlying discovery mechanism.
fn find_peers(&mut self) {
// pick a random NodeId
@@ -268,13 +304,14 @@ where
let mut address = Multiaddr::from(socket.ip());
address.push(Protocol::Tcp(self.tcp_port));
let enr = self.discovery.local_enr();
save_enr_to_disc(Path::new(&self.enr_dir), enr, &self.log);
enr_helpers::save_enr_to_disk(Path::new(&self.enr_dir), enr, &self.log);
return Async::Ready(NetworkBehaviourAction::ReportObservedAddr {
address,
});
}
Discv5Event::FindNodeResult { closer_peers, .. } => {
// TODO: Modify once ENR predicate search is available
debug!(self.log, "Discovery query completed"; "peers_found" => closer_peers.len());
// update the time to the next query
if self.past_discovery_delay < MAX_TIME_BETWEEN_PEER_SEARCHES {
@@ -320,94 +357,3 @@ where
Async::NotReady
}
}
/// Loads an ENR from file if it exists and matches the current NodeId and sequence number. If none
/// exists, generates a new one.
///
/// If an ENR exists, with the same NodeId and IP address, we use the disk-generated one as its
/// ENR sequence will be equal or higher than a newly generated one.
fn load_enr(local_key: Keypair, config: &NetworkConfig, log: &slog::Logger) -> Result<Enr, String> {
// Build the local ENR.
// Note: Discovery should update the ENR record's IP to the external IP as seen by the
// majority of our peers, if the CLI doesn't expressly forbid it.
let enr_key: CombinedKey = local_key
.try_into()
.map_err(|_| "Invalid key type for ENR records")?;
let mut local_enr = {
let mut builder = EnrBuilder::new("v4");
if let Some(enr_address) = config.enr_address {
builder.ip(enr_address);
}
if let Some(udp_port) = config.enr_udp_port {
builder.udp(udp_port);
}
// we always give it our listening tcp port
// TODO: Add uPnP support to map udp and tcp ports
let tcp_port = config.enr_tcp_port.unwrap_or_else(|| config.libp2p_port);
builder.tcp(tcp_port);
builder
.tcp(config.libp2p_port)
.build(&enr_key)
.map_err(|e| format!("Could not build Local ENR: {:?}", e))?
};
let enr_f = config.network_dir.join(ENR_FILENAME);
if let Ok(mut enr_file) = File::open(enr_f.clone()) {
let mut enr_string = String::new();
match enr_file.read_to_string(&mut enr_string) {
Err(_) => debug!(log, "Could not read ENR from file"),
Ok(_) => {
match Enr::from_str(&enr_string) {
Ok(enr) => {
let tcp_port = config.enr_tcp_port.unwrap_or_else(|| config.libp2p_port);
if enr.node_id() == local_enr.node_id() {
if (config.enr_address.is_none()
|| enr.ip().map(Into::into) == config.enr_address)
&& enr.tcp() == Some(tcp_port)
&& (config.enr_udp_port.is_none()
|| enr.udp() == config.enr_udp_port)
{
debug!(log, "ENR loaded from file"; "file" => format!("{:?}", enr_f));
// the stored ENR has the same configuration, use it
return Ok(enr);
}
// same node id, different configuration - update the sequence number
let new_seq_no = enr.seq().checked_add(1).ok_or_else(|| "ENR sequence number on file is too large. Remove it to generate a new NodeId")?;
local_enr.set_seq(new_seq_no, &enr_key).map_err(|e| {
format!("Could not update ENR sequence number: {:?}", e)
})?;
debug!(log, "ENR sequence number increased"; "seq" => new_seq_no);
}
}
Err(e) => {
warn!(log, "ENR from file could not be decoded"; "error" => format!("{:?}", e));
}
}
}
}
}
save_enr_to_disc(&config.network_dir, &local_enr, log);
Ok(local_enr)
}
fn save_enr_to_disc(dir: &Path, enr: &Enr, log: &slog::Logger) {
let _ = std::fs::create_dir_all(dir);
match File::create(dir.join(Path::new(ENR_FILENAME)))
.and_then(|mut f| f.write_all(&enr.to_base64().as_bytes()))
{
Ok(_) => {
debug!(log, "ENR written to disk");
}
Err(e) => {
warn!(
log,
"Could not write ENR to file"; "file" => format!("{:?}{:?}",dir, ENR_FILENAME), "error" => format!("{}", e)
);
}
}
}