Downgrade libp2p and gosispsub (#1358)

Downgrades libp2p and the gossipsub updates. 

This looks to resolve the CPU usage issue we have been seeing. 

The root cause is likely inside the latest gossipsub updates, which will be addressed in a later PR
This commit is contained in:
Age Manning
2020-07-15 05:04:09 +00:00
parent 4b213032b2
commit 4a01f44206
4 changed files with 99 additions and 63 deletions

View File

@@ -11,7 +11,7 @@ use libp2p::{
identity::Keypair,
Multiaddr,
},
gossipsub::{Gossipsub, GossipsubEvent, MessageId, Signing},
gossipsub::{Gossipsub, GossipsubEvent, MessageId},
identify::{Identify, IdentifyEvent},
swarm::{
NetworkBehaviour, NetworkBehaviourAction as NBAction, NotifyHandler, PollParameters,
@@ -19,6 +19,7 @@ use libp2p::{
},
PeerId,
};
use lru::LruCache;
use slog::{crit, debug, o, trace};
use std::{
collections::VecDeque,
@@ -55,6 +56,10 @@ pub struct Behaviour<TSpec: EthSpec> {
peers_to_dc: VecDeque<PeerId>,
/// The current meta data of the node, so respond to pings and get metadata
meta_data: MetaData<TSpec>,
/// A cache of recently seen gossip messages. This is used to filter out any possible
/// duplicates that may still be seen over gossipsub.
// TODO: Remove this
seen_gossip_messages: LruCache<MessageId, ()>,
/// A collections of variables accessible outside the network service.
network_globals: Arc<NetworkGlobals<TSpec>>,
/// Keeps track of the current EnrForkId for upgrading gossipsub topics.
@@ -75,6 +80,7 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {
network_globals: Arc<NetworkGlobals<TSpec>>,
log: &slog::Logger,
) -> error::Result<Self> {
let local_peer_id = local_key.public().into_peer_id();
let behaviour_log = log.new(o!());
let identify = Identify::new(
@@ -100,15 +106,13 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {
Ok(Behaviour {
eth2_rpc: RPC::new(log.clone()),
gossipsub: Gossipsub::new(
Signing::Disabled(PeerId::random()),
net_conf.gs_config.clone(),
),
gossipsub: Gossipsub::new(local_peer_id, net_conf.gs_config.clone()),
identify,
peer_manager: PeerManager::new(local_key, net_conf, network_globals.clone(), log)?,
events: VecDeque::new(),
handler_events: VecDeque::new(),
peers_to_dc: VecDeque::new(),
seen_gossip_messages: LruCache::new(100_000),
meta_data,
network_globals,
enr_fork_id,
@@ -211,9 +215,7 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {
for topic in message.topics(GossipEncoding::default(), self.enr_fork_id.fork_digest) {
match message.encode(GossipEncoding::default()) {
Ok(message_data) => {
if let Err(e) = self.gossipsub.publish(&topic.into(), message_data) {
slog::warn!(self.log, "Could not publish message"; "error" => format!("{:?}", e));
}
self.gossipsub.publish(&topic.into(), message_data);
}
Err(e) => crit!(self.log, "Could not publish message"; "error" => e),
}
@@ -393,17 +395,31 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {
fn on_gossip_event(&mut self, event: GossipsubEvent) {
match event {
GossipsubEvent::Message(propagation_source, id, gs_msg) => {
match PubsubMessage::decode(&gs_msg.topics, &gs_msg.data) {
Err(e) => {
debug!(self.log, "Could not decode gossipsub message"; "error" => format!("{}", e))
// Note: We are keeping track here of the peer that sent us the message, not the
// peer that originally published the message.
if self.seen_gossip_messages.put(id.clone(), ()).is_none() {
match PubsubMessage::decode(&gs_msg.topics, &gs_msg.data) {
Err(e) => {
debug!(self.log, "Could not decode gossipsub message"; "error" => format!("{}", e))
}
Ok(msg) => {
// if this message isn't a duplicate, notify the network
self.add_event(BehaviourEvent::PubsubMessage {
id,
source: propagation_source,
topics: gs_msg.topics,
message: msg,
});
}
}
Ok(msg) => {
self.add_event(BehaviourEvent::PubsubMessage {
id,
source: propagation_source,
topics: gs_msg.topics,
message: msg,
});
} else {
match PubsubMessage::<TSpec>::decode(&gs_msg.topics, &gs_msg.data) {
Err(e) => {
debug!(self.log, "Could not decode gossipsub message"; "error" => format!("{}", e))
}
Ok(msg) => {
debug!(self.log, "A duplicate gossipsub message was received"; "message_source" => format!("{}", gs_msg.source), "propagated_peer" => format!("{}",propagation_source), "message" => format!("{}", msg));
}
}
}
}