mirror of
https://github.com/sigp/lighthouse.git
synced 2026-03-14 10:22:38 +00:00
Filter gossipsub message duplication (#736)
* Add duplication prevention to gossipsub * Clean up topic logs * Add content addressed messages for gossip
This commit is contained in:
@@ -1,20 +1,20 @@
|
||||
use crate::config::*;
|
||||
use crate::discovery::Discovery;
|
||||
use crate::rpc::{RPCEvent, RPCMessage, RPC};
|
||||
use crate::GossipTopic;
|
||||
use crate::{error, NetworkConfig};
|
||||
use crate::{Topic, TopicHash};
|
||||
use crate::{BEACON_ATTESTATION_TOPIC, BEACON_BLOCK_TOPIC};
|
||||
use futures::prelude::*;
|
||||
use libp2p::{
|
||||
core::identity::Keypair,
|
||||
discv5::Discv5Event,
|
||||
gossipsub::{Gossipsub, GossipsubEvent},
|
||||
gossipsub::{Gossipsub, GossipsubEvent, MessageId},
|
||||
identify::{Identify, IdentifyEvent},
|
||||
ping::{Ping, PingConfig, PingEvent},
|
||||
swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess},
|
||||
tokio_io::{AsyncRead, AsyncWrite},
|
||||
NetworkBehaviour, PeerId,
|
||||
};
|
||||
use lru::LruCache;
|
||||
use slog::{debug, o};
|
||||
use std::num::NonZeroU32;
|
||||
use std::time::Duration;
|
||||
@@ -39,9 +39,13 @@ pub struct Behaviour<TSubstream: AsyncRead + AsyncWrite> {
|
||||
identify: Identify<TSubstream>,
|
||||
/// Discovery behaviour.
|
||||
discovery: Discovery<TSubstream>,
|
||||
#[behaviour(ignore)]
|
||||
/// The events generated by this behaviour to be consumed in the swarm poll.
|
||||
#[behaviour(ignore)]
|
||||
events: Vec<BehaviourEvent>,
|
||||
/// A cache of recently seen gossip messages. This is used to filter out any possible
|
||||
/// duplicates that may still be seen over gossipsub.
|
||||
#[behaviour(ignore)]
|
||||
seen_gossip_messages: LruCache<PubsubMessage, ()>,
|
||||
/// Logger for behaviour actions.
|
||||
#[behaviour(ignore)]
|
||||
log: slog::Logger,
|
||||
@@ -74,6 +78,7 @@ impl<TSubstream: AsyncRead + AsyncWrite> Behaviour<TSubstream> {
|
||||
discovery: Discovery::new(local_key, net_conf, log)?,
|
||||
ping: Ping::new(ping_config),
|
||||
identify,
|
||||
seen_gossip_messages: LruCache::new(256),
|
||||
events: Vec::new(),
|
||||
log: behaviour_log,
|
||||
})
|
||||
@@ -94,18 +99,22 @@ impl<TSubstream: AsyncRead + AsyncWrite> NetworkBehaviourEventProcess<GossipsubE
|
||||
{
|
||||
fn inject_event(&mut self, event: GossipsubEvent) {
|
||||
match event {
|
||||
GossipsubEvent::Message(propagation_source, gs_msg) => {
|
||||
let id = gs_msg.id();
|
||||
GossipsubEvent::Message(propagation_source, id, gs_msg) => {
|
||||
let msg = PubsubMessage::from_topics(&gs_msg.topics, gs_msg.data);
|
||||
|
||||
// Note: We are keeping track here of the peer that sent us the message, not the
|
||||
// peer that originally published the message.
|
||||
self.events.push(BehaviourEvent::GossipMessage {
|
||||
id,
|
||||
source: propagation_source,
|
||||
topics: gs_msg.topics,
|
||||
message: msg,
|
||||
});
|
||||
if self.seen_gossip_messages.put(msg.clone(), ()).is_none() {
|
||||
// if this message isn't a duplicate, notify the network
|
||||
self.events.push(BehaviourEvent::GossipMessage {
|
||||
id,
|
||||
source: propagation_source,
|
||||
topics: gs_msg.topics,
|
||||
message: msg,
|
||||
});
|
||||
} else {
|
||||
debug!(self.log, "A duplicate message was received"; "message" => format!("{:?}", msg));
|
||||
}
|
||||
}
|
||||
GossipsubEvent::Subscribed { peer_id, topic } => {
|
||||
self.events
|
||||
@@ -218,7 +227,7 @@ impl<TSubstream: AsyncRead + AsyncWrite> Behaviour<TSubstream> {
|
||||
|
||||
/// Forwards a message that is waiting in gossipsub's mcache. Messages are only propagated
|
||||
/// once validated by the beacon chain.
|
||||
pub fn propagate_message(&mut self, propagation_source: &PeerId, message_id: String) {
|
||||
pub fn propagate_message(&mut self, propagation_source: &PeerId, message_id: MessageId) {
|
||||
self.gossipsub
|
||||
.propagate_message(&message_id, propagation_source);
|
||||
}
|
||||
@@ -263,7 +272,7 @@ pub enum BehaviourEvent {
|
||||
/// A gossipsub message has been received.
|
||||
GossipMessage {
|
||||
/// The gossipsub message id. Used when propagating blocks after validation.
|
||||
id: String,
|
||||
id: MessageId,
|
||||
/// The peer from which we received this message, not the peer that published it.
|
||||
source: PeerId,
|
||||
/// The topics that this message was sent on.
|
||||
@@ -277,7 +286,7 @@ pub enum BehaviourEvent {
|
||||
|
||||
/// Messages that are passed to and from the pubsub (Gossipsub) behaviour. These are encoded and
|
||||
/// decoded upstream.
|
||||
#[derive(Debug, Clone, PartialEq)]
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
|
||||
pub enum PubsubMessage {
|
||||
/// Gossipsub message providing notification of a new block.
|
||||
Block(Vec<u8>),
|
||||
@@ -302,20 +311,14 @@ impl PubsubMessage {
|
||||
*/
|
||||
fn from_topics(topics: &[TopicHash], data: Vec<u8>) -> Self {
|
||||
for topic in topics {
|
||||
// compare the prefix and postfix, then match on the topic
|
||||
let topic_parts: Vec<&str> = topic.as_str().split('/').collect();
|
||||
if topic_parts.len() == 4
|
||||
&& topic_parts[1] == TOPIC_PREFIX
|
||||
&& topic_parts[3] == TOPIC_ENCODING_POSTFIX
|
||||
{
|
||||
match topic_parts[2] {
|
||||
BEACON_BLOCK_TOPIC => return PubsubMessage::Block(data),
|
||||
BEACON_ATTESTATION_TOPIC => return PubsubMessage::Attestation(data),
|
||||
VOLUNTARY_EXIT_TOPIC => return PubsubMessage::VoluntaryExit(data),
|
||||
PROPOSER_SLASHING_TOPIC => return PubsubMessage::ProposerSlashing(data),
|
||||
ATTESTER_SLASHING_TOPIC => return PubsubMessage::AttesterSlashing(data),
|
||||
_ => {}
|
||||
}
|
||||
match GossipTopic::from(topic.as_str()) {
|
||||
GossipTopic::BeaconBlock => return PubsubMessage::Block(data),
|
||||
GossipTopic::BeaconAttestation => return PubsubMessage::Attestation(data),
|
||||
GossipTopic::VoluntaryExit => return PubsubMessage::VoluntaryExit(data),
|
||||
GossipTopic::ProposerSlashing => return PubsubMessage::ProposerSlashing(data),
|
||||
GossipTopic::AttesterSlashing => return PubsubMessage::AttesterSlashing(data),
|
||||
GossipTopic::Shard => return PubsubMessage::Unknown(data),
|
||||
GossipTopic::Unknown(_) => continue,
|
||||
}
|
||||
}
|
||||
PubsubMessage::Unknown(data)
|
||||
|
||||
Reference in New Issue
Block a user