Update to latest libp2p (#1810)

## Description

Updates to the latest libp2p and includes gossipsub updates. 

Of particular note is the limitation of a single topic per gossipsub message.

Co-authored-by: blacktemplar <blacktemplar@a1.net>
This commit is contained in:
Age Manning
2020-10-23 03:01:31 +00:00
parent acd49d988d
commit c49dd94e20
8 changed files with 176 additions and 186 deletions

View File

@@ -42,7 +42,7 @@ regex = "1.3.9"
[dependencies.libp2p]
#version = "0.23.0"
git = "https://github.com/sigp/rust-libp2p"
rev = "a731aa803d986977c25a77ed2b002d9578f7377c"
rev = "e42d105cd776b50fc8ef5d726a9e2c799196bd80"
default-features = false
features = ["websocket", "identify", "mplex", "noise", "gossipsub", "dns", "tcp-tokio"]

View File

@@ -90,8 +90,6 @@ pub enum BehaviourEvent<TSpec: EthSpec> {
id: MessageId,
/// The peer from which we received this message, not the peer that published it.
source: PeerId,
/// The topics that this message was sent on.
topics: Vec<TopicHash>,
/// The message itself.
message: PubsubMessage<TSpec>,
},
@@ -304,35 +302,34 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {
/// Publishes a list of messages on the pubsub (gossipsub) behaviour, choosing the encoding.
pub fn publish(&mut self, messages: Vec<PubsubMessage<TSpec>>) {
for message in messages {
for topic in message.topics(GossipEncoding::default(), self.enr_fork_id.fork_digest) {
match message.encode(GossipEncoding::default()) {
Ok(message_data) => {
if let Err(e) = self.gossipsub.publish(topic.clone().into(), message_data) {
slog::warn!(self.log, "Could not publish message"; "error" => format!("{:?}", e));
let topic = message.topic(GossipEncoding::default(), self.enr_fork_id.fork_digest);
match message.encode(GossipEncoding::default()) {
Ok(message_data) => {
if let Err(e) = self.gossipsub.publish(topic.clone().into(), message_data) {
slog::warn!(self.log, "Could not publish message"; "error" => format!("{:?}", e));
// add to metrics
match topic.kind() {
GossipKind::Attestation(subnet_id) => {
if let Some(v) = metrics::get_int_gauge(
&metrics::FAILED_ATTESTATION_PUBLISHES_PER_SUBNET,
&[&subnet_id.to_string()],
) {
v.inc()
};
}
kind => {
if let Some(v) = metrics::get_int_gauge(
&metrics::FAILED_PUBLISHES_PER_MAIN_TOPIC,
&[&format!("{:?}", kind)],
) {
v.inc()
};
}
// add to metrics
match topic.kind() {
GossipKind::Attestation(subnet_id) => {
if let Some(v) = metrics::get_int_gauge(
&metrics::FAILED_ATTESTATION_PUBLISHES_PER_SUBNET,
&[&subnet_id.to_string()],
) {
v.inc()
};
}
kind => {
if let Some(v) = metrics::get_int_gauge(
&metrics::FAILED_PUBLISHES_PER_MAIN_TOPIC,
&[&format!("{:?}", kind)],
) {
v.inc()
};
}
}
}
Err(e) => crit!(self.log, "Could not publish message"; "error" => e),
}
Err(e) => crit!(self.log, "Could not publish message"; "error" => e),
}
}
}
@@ -537,7 +534,7 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {
} => {
// Note: We are keeping track here of the peer that sent us the message, not the
// peer that originally published the message.
match PubsubMessage::decode(&gs_msg.topics, gs_msg.data()) {
match PubsubMessage::decode(&gs_msg.topic, gs_msg.data()) {
Err(e) => {
debug!(self.log, "Could not decode gossipsub message"; "error" => e);
//reject the message
@@ -554,7 +551,6 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {
self.add_event(BehaviourEvent::PubsubMessage {
id,
source: propagation_source,
topics: gs_msg.topics,
message: msg,
});
}

View File

@@ -9,7 +9,7 @@ use crate::EnrExt;
use crate::{NetworkConfig, NetworkGlobals, PeerAction};
use futures::prelude::*;
use libp2p::core::{
identity::Keypair, multiaddr::Multiaddr, muxing::StreamMuxerBox, transport::boxed::Boxed,
identity::Keypair, multiaddr::Multiaddr, muxing::StreamMuxerBox, transport::Boxed,
};
use libp2p::{
core, noise,
@@ -20,7 +20,6 @@ use slog::{crit, debug, info, o, trace, warn};
use ssz::Decode;
use std::fs::File;
use std::io::prelude::*;
use std::io::{Error, ErrorKind};
use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;
@@ -323,9 +322,7 @@ impl<TSpec: EthSpec> Service<TSpec> {
/// The implementation supports TCP/IP, WebSockets over TCP/IP, noise as the encryption layer, and
/// mplex as the multiplexing layer.
fn build_transport(
local_private_key: Keypair,
) -> Result<Boxed<(PeerId, StreamMuxerBox), Error>, Error> {
fn build_transport(local_private_key: Keypair) -> std::io::Result<Boxed<(PeerId, StreamMuxerBox)>> {
let transport = libp2p::tcp::TokioTcpConfig::new().nodelay(true);
let transport = libp2p::dns::DnsConfig::new(transport)?;
#[cfg(feature = "libp2p-websocket")]
@@ -338,10 +335,7 @@ fn build_transport(
.upgrade(core::upgrade::Version::V1)
.authenticate(generate_noise_config(&local_private_key))
.multiplex(libp2p::mplex::MplexConfig::new())
.map(|(peer, muxer), _| (peer, core::muxing::StreamMuxerBox::new(muxer)))
.timeout(Duration::from_secs(10))
.timeout(Duration::from_secs(10))
.map_err(|err| Error::new(ErrorKind::Other, err))
.boxed())
}

View File

@@ -74,10 +74,10 @@ fn decompress_snappy(data: &[u8]) -> Result<Vec<u8>, String> {
}
impl<T: EthSpec> PubsubMessage<T> {
/// Returns the topics that each pubsub message will be sent across, given a supported
/// Returns the topic that each pubsub message will be sent across, given a supported
/// gossipsub encoding and fork version.
pub fn topics(&self, encoding: GossipEncoding, fork_version: [u8; 4]) -> Vec<GossipTopic> {
vec![GossipTopic::new(self.kind(), encoding, fork_version)]
pub fn topic(&self, encoding: GossipEncoding, fork_version: [u8; 4]) -> GossipTopic {
GossipTopic::new(self.kind(), encoding, fork_version)
}
/// Returns the kind of gossipsub topic associated with the message.
@@ -104,68 +104,54 @@ impl<T: EthSpec> PubsubMessage<T> {
* Also note that a message can be associated with many topics. As soon as one of the topics is
* known we match. If none of the topics are known we return an unknown state.
*/
pub fn decode(topics: &[TopicHash], data: &MessageData) -> Result<Self, String> {
let mut unknown_topics = Vec::new();
for topic in topics {
match GossipTopic::decode(topic.as_str()) {
Err(_) => {
unknown_topics.push(topic);
continue;
}
Ok(gossip_topic) => {
let decompressed_data = match gossip_topic.encoding() {
GossipEncoding::SSZSnappy => data.decompressed.as_ref()?.as_slice(),
};
// the ssz decoders
match gossip_topic.kind() {
GossipKind::BeaconAggregateAndProof => {
let agg_and_proof =
SignedAggregateAndProof::from_ssz_bytes(decompressed_data)
.map_err(|e| format!("{:?}", e))?;
return Ok(PubsubMessage::AggregateAndProofAttestation(Box::new(
agg_and_proof,
)));
}
GossipKind::Attestation(subnet_id) => {
let attestation = Attestation::from_ssz_bytes(decompressed_data)
pub fn decode(topic: &TopicHash, data: &MessageData) -> Result<Self, String> {
match GossipTopic::decode(topic.as_str()) {
Err(_) => Err(format!("Unknown topic: {}", topic)),
Ok(gossip_topic) => {
let decompressed_data = match gossip_topic.encoding() {
GossipEncoding::SSZSnappy => data.decompressed.as_ref()?.as_slice(),
};
// the ssz decoders
match gossip_topic.kind() {
GossipKind::BeaconAggregateAndProof => {
let agg_and_proof =
SignedAggregateAndProof::from_ssz_bytes(decompressed_data)
.map_err(|e| format!("{:?}", e))?;
return Ok(PubsubMessage::Attestation(Box::new((
*subnet_id,
attestation,
))));
}
GossipKind::BeaconBlock => {
let beacon_block = SignedBeaconBlock::from_ssz_bytes(decompressed_data)
.map_err(|e| format!("{:?}", e))?;
return Ok(PubsubMessage::BeaconBlock(Box::new(beacon_block)));
}
GossipKind::VoluntaryExit => {
let voluntary_exit =
SignedVoluntaryExit::from_ssz_bytes(decompressed_data)
.map_err(|e| format!("{:?}", e))?;
return Ok(PubsubMessage::VoluntaryExit(Box::new(voluntary_exit)));
}
GossipKind::ProposerSlashing => {
let proposer_slashing =
ProposerSlashing::from_ssz_bytes(decompressed_data)
.map_err(|e| format!("{:?}", e))?;
return Ok(PubsubMessage::ProposerSlashing(Box::new(
proposer_slashing,
)));
}
GossipKind::AttesterSlashing => {
let attester_slashing =
AttesterSlashing::from_ssz_bytes(decompressed_data)
.map_err(|e| format!("{:?}", e))?;
return Ok(PubsubMessage::AttesterSlashing(Box::new(
attester_slashing,
)));
}
Ok(PubsubMessage::AggregateAndProofAttestation(Box::new(
agg_and_proof,
)))
}
GossipKind::Attestation(subnet_id) => {
let attestation = Attestation::from_ssz_bytes(decompressed_data)
.map_err(|e| format!("{:?}", e))?;
Ok(PubsubMessage::Attestation(Box::new((
*subnet_id,
attestation,
))))
}
GossipKind::BeaconBlock => {
let beacon_block = SignedBeaconBlock::from_ssz_bytes(decompressed_data)
.map_err(|e| format!("{:?}", e))?;
Ok(PubsubMessage::BeaconBlock(Box::new(beacon_block)))
}
GossipKind::VoluntaryExit => {
let voluntary_exit = SignedVoluntaryExit::from_ssz_bytes(decompressed_data)
.map_err(|e| format!("{:?}", e))?;
Ok(PubsubMessage::VoluntaryExit(Box::new(voluntary_exit)))
}
GossipKind::ProposerSlashing => {
let proposer_slashing = ProposerSlashing::from_ssz_bytes(decompressed_data)
.map_err(|e| format!("{:?}", e))?;
Ok(PubsubMessage::ProposerSlashing(Box::new(proposer_slashing)))
}
GossipKind::AttesterSlashing => {
let attester_slashing = AttesterSlashing::from_ssz_bytes(decompressed_data)
.map_err(|e| format!("{:?}", e))?;
Ok(PubsubMessage::AttesterSlashing(Box::new(attester_slashing)))
}
}
}
}
Err(format!("Unknown gossipsub topics: {:?}", unknown_topics))
}
/// Encodes a `PubsubMessage` based on the topic encodings. The first known encoding is used. If

View File

@@ -40,4 +40,4 @@ igd = "0.11.1"
itertools = "0.9.0"
num_cpus = "1.13.0"
lru_cache = { path = "../../common/lru_cache" }
get_if_addrs = "0.5.3"
if-addrs = "0.6.4"

View File

@@ -4,7 +4,7 @@
//! - UPnP
use crate::{NetworkConfig, NetworkMessage};
use get_if_addrs::get_if_addrs;
use if_addrs::get_if_addrs;
use slog::{debug, info, warn};
use std::net::{IpAddr, SocketAddr, SocketAddrV4};
use tokio::sync::mpsc;

View File

@@ -428,7 +428,6 @@ fn spawn_service<T: BeaconChainTypes>(
id,
source,
message,
..
} => {
// Update prometheus metrics.
expose_receive_metrics(&message);