Update libp2p and gossipsub (#1324)

* Update libp2p and gossipsub

* Remove gossipsub lru cache
This commit is contained in:
Age Manning
2020-07-06 20:34:40 +10:00
committed by GitHub
parent f631155304
commit 5977c00edb
4 changed files with 63 additions and 76 deletions

View File

@@ -39,7 +39,7 @@ environment = { path = "../../lighthouse/environment" }
[dependencies.libp2p]
#version = "0.19.1"
git = "https://github.com/sigp/rust-libp2p"
rev = "a6232506278b9e686248f8d04b79400861b143c2"
rev = "95e27446ca4371e41fc0035b187f60daa19b4b86"
default-features = false
features = ["websocket", "identify", "mplex", "yamux", "noise", "gossipsub", "dns", "secio", "tcp-tokio"]

View File

@@ -11,7 +11,7 @@ use libp2p::{
identity::Keypair,
Multiaddr,
},
gossipsub::{Gossipsub, GossipsubEvent, MessageId},
gossipsub::{Gossipsub, GossipsubEvent, MessageId, Signing},
identify::{Identify, IdentifyEvent},
swarm::{
NetworkBehaviour, NetworkBehaviourAction as NBAction, NotifyHandler, PollParameters,
@@ -19,7 +19,6 @@ use libp2p::{
},
PeerId,
};
use lru::LruCache;
use slog::{crit, debug, o};
use std::{
marker::PhantomData,
@@ -53,10 +52,6 @@ pub struct Behaviour<TSpec: EthSpec> {
peers_to_dc: Vec<PeerId>,
/// The current meta data of the node, so respond to pings and get metadata
meta_data: MetaData<TSpec>,
/// A cache of recently seen gossip messages. This is used to filter out any possible
/// duplicates that may still be seen over gossipsub.
// TODO: Remove this
seen_gossip_messages: LruCache<MessageId, ()>,
/// A collections of variables accessible outside the network service.
network_globals: Arc<NetworkGlobals<TSpec>>,
/// Keeps track of the current EnrForkId for upgrading gossipsub topics.
@@ -238,7 +233,6 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {
network_globals: Arc<NetworkGlobals<TSpec>>,
log: &slog::Logger,
) -> error::Result<Self> {
let local_peer_id = local_key.public().into_peer_id();
let behaviour_log = log.new(o!());
let identify = Identify::new(
@@ -264,12 +258,14 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {
Ok(Behaviour {
eth2_rpc: RPC::new(log.clone()),
gossipsub: Gossipsub::new(local_peer_id, net_conf.gs_config.clone()),
gossipsub: Gossipsub::new(
Signing::Disabled(PeerId::random()),
net_conf.gs_config.clone(),
),
identify,
peer_manager: PeerManager::new(local_key, net_conf, network_globals.clone(), log)?,
events: Vec::new(),
peers_to_dc: Vec::new(),
seen_gossip_messages: LruCache::new(100_000),
meta_data,
network_globals,
enr_fork_id,
@@ -361,7 +357,9 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {
for topic in message.topics(GossipEncoding::default(), self.enr_fork_id.fork_digest) {
match message.encode(GossipEncoding::default()) {
Ok(message_data) => {
self.gossipsub.publish(&topic.into(), message_data);
if let Err(e) = self.gossipsub.publish(&topic.into(), message_data) {
slog::warn!(self.log, "Could not publish message"; "error" => format!("{:?}", e));
}
}
Err(e) => crit!(self.log, "Could not publish message"; "error" => e),
}
@@ -572,29 +570,18 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {
GossipsubEvent::Message(propagation_source, id, gs_msg) => {
// Note: We are keeping track here of the peer that sent us the message, not the
// peer that originally published the message.
if self.seen_gossip_messages.put(id.clone(), ()).is_none() {
match PubsubMessage::decode(&gs_msg.topics, &gs_msg.data) {
Err(e) => {
debug!(self.log, "Could not decode gossipsub message"; "error" => format!("{}", e))
}
Ok(msg) => {
// if this message isn't a duplicate, notify the network
self.events.push(BehaviourEvent::PubsubMessage {
id,
source: propagation_source,
topics: gs_msg.topics,
message: msg,
});
}
match PubsubMessage::decode(&gs_msg.topics, &gs_msg.data) {
Err(e) => {
debug!(self.log, "Could not decode gossipsub message"; "error" => format!("{}", e))
}
} else {
match PubsubMessage::<TSpec>::decode(&gs_msg.topics, &gs_msg.data) {
Err(e) => {
debug!(self.log, "Could not decode gossipsub message"; "error" => format!("{}", e))
}
Ok(msg) => {
debug!(self.log, "A duplicate gossipsub message was received"; "message_source" => format!("{}", gs_msg.source), "propagated_peer" => format!("{}",propagation_source), "message" => format!("{}", msg));
}
Ok(msg) => {
// if this message isn't a duplicate, notify the network
self.events.push(BehaviourEvent::PubsubMessage {
id,
source: propagation_source,
topics: gs_msg.topics,
message: msg,
});
}
}
}

View File

@@ -94,8 +94,8 @@ impl Default for Config {
let gs_config = GossipsubConfigBuilder::new()
.max_transmit_size(GOSSIP_MAX_SIZE)
.heartbeat_interval(Duration::from_secs(1))
.history_length(385) // A heartbeat is 1 second. We want to keep an epoch worth of history,
.manual_propagation() // require validation before propagation
.no_source_id()
.message_id_fn(gossip_message_id)
.build();