Implement gossipsub IDONTWANT (#5422)

* move gossipsub into a separate crate

* Merge branch 'unstable' of github.com:sigp/lighthouse into separate-gossipsub

* update rpc.proto and generate rust bindings

* gossipsub: implement IDONTWANT messages

* address review

* move GossipPromises out of PeerScore

* impl PeerKind::is_gossipsub

that returns true if peer speaks any version of gossipsub

* address review 2

* Merge branch 'separate-gossipsub' of github.com:sigp/lighthouse into impl-gossipsub-idontwant

* Merge branch 'unstable' of github.com:sigp/lighthouse into impl-gossipsub-idontwant

* add metrics

* add tests

* make 1.2 beta before spec is merged

* Merge branch 'unstable' of github.com:sigp/lighthouse into impl-gossipsub-idontwant

* cargo clippy

* Collect decoded IDONTWANT messages

* Use the beta tag in most places to simplify the transition

* Fix failed test by using fresh message-ids

* Gossipsub v1.2-beta

* Merge latest unstable

* Cargo update

* Merge pull request #5 from ackintosh/impl-gossipsub-idontwant-ackintosh-fix-test

Fix `test_ignore_too_many_messages_in_ihave` test

* Merge branch 'unstable' of github.com:sigp/lighthouse into impl-gossipsub-idontwant

* update CHANGELOG.md

* remove beta for 1.2 IDONTWANT spec has been merged

* Merge branch 'unstable' of github.com:sigp/lighthouse into impl-gossipsub-idontwant

* Merge branch 'impl-gossipsub-idontwant' of github.com:jxs/lighthouse into impl-gossipsub-idontwant

* Merge branch 'unstable' of github.com:sigp/lighthouse into impl-gossipsub-idontwant

* improve comments wording

* Merge branch 'impl-gossipsub-idontwant' of github.com:jxs/lighthouse into impl-gossipsub-idontwant
This commit is contained in:
João Oliveira
2024-07-09 06:37:19 +01:00
committed by GitHub
parent 2e2ccec9b5
commit d46ac6c3d3
13 changed files with 533 additions and 48 deletions

16
Cargo.lock generated
View File

@@ -2069,7 +2069,7 @@ dependencies = [
"enr",
"fnv",
"futures",
"hashlink",
"hashlink 0.8.4",
"hex",
"hkdf",
"lazy_static",
@@ -3357,6 +3357,7 @@ dependencies = [
"futures-ticker",
"futures-timer",
"getrandom",
"hashlink 0.9.0",
"hex_fmt",
"libp2p",
"prometheus-client",
@@ -3472,6 +3473,15 @@ dependencies = [
"hashbrown 0.14.5",
]
[[package]]
name = "hashlink"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "692eaaf7f7607518dd3cef090f1474b61edc5301d8012f09579920df68b725ee"
dependencies = [
"hashbrown 0.14.5",
]
[[package]]
name = "headers"
version = "0.3.9"
@@ -6956,7 +6966,7 @@ dependencies = [
"bitflags 1.3.2",
"fallible-iterator",
"fallible-streaming-iterator",
"hashlink",
"hashlink 0.8.4",
"libsqlite3-sys",
"smallvec",
]
@@ -9773,7 +9783,7 @@ checksum = "498f4d102a79ea1c9d4dd27573c0fc96ad74c023e8da38484e47883076da25fb"
dependencies = [
"arraydeque",
"encoding_rs",
"hashlink",
"hashlink 0.8.4",
]
[[package]]

View File

@@ -126,6 +126,7 @@ fnv = "1"
fs2 = "0.4"
futures = "0.3"
hex = "0.4"
hashlink = "0.9.0"
hyper = "1"
itertools = "0.10"
lazy_static = "1"

View File

@@ -1,5 +1,8 @@
## 0.5 Sigma Prime fork
- Implement IDONTWANT messages as per [spec](https://github.com/libp2p/specs/pull/548).
See [PR 5422](https://github.com/sigp/lighthouse/pull/5422)
- Attempt to publish to at least mesh_n peers when publishing a message when flood publish is disabled.
See [PR 5357](https://github.com/sigp/lighthouse/pull/5357).
- Drop `Publish` and `Forward` gossipsub stale messages when polling ConnectionHandler.

View File

@@ -24,6 +24,7 @@ futures = "0.3.30"
futures-ticker = "0.0.3"
futures-timer = "3.0.2"
getrandom = "0.2.12"
hashlink.workspace = true
hex_fmt = "0.3.0"
libp2p = { version = "0.53", default-features = false }
quick-protobuf = "0.8"

View File

@@ -31,6 +31,7 @@ use std::{
use futures::StreamExt;
use futures_ticker::Ticker;
use hashlink::LinkedHashMap;
use prometheus_client::registry::Registry;
use rand::{seq::SliceRandom, thread_rng};
@@ -45,6 +46,8 @@ use libp2p::swarm::{
};
use web_time::{Instant, SystemTime};
use crate::types::IDontWant;
use super::gossip_promises::GossipPromises;
use super::handler::{Handler, HandlerEvent, HandlerIn};
use super::mcache::MessageCache;
@@ -73,6 +76,12 @@ use std::{cmp::Ordering::Equal, fmt::Debug};
#[cfg(test)]
mod tests;
/// IDONTWANT cache capacity.
const IDONTWANT_CAP: usize = 10_000;
/// IDONTWANT timeout before removal.
const IDONTWANT_TIMEOUT: Duration = Duration::new(3, 0);
/// Determines if published messages should be signed or not.
///
/// Without signing, a number of privacy preserving modes can be selected.
@@ -304,9 +313,8 @@ pub struct Behaviour<D = IdentityTransform, F = AllowAllSubscriptionFilter> {
/// discovery and not by PX).
outbound_peers: HashSet<PeerId>,
/// Stores optional peer score data together with thresholds, decay interval and gossip
/// promises.
peer_score: Option<(PeerScore, PeerScoreThresholds, Ticker, GossipPromises)>,
/// Stores optional peer score data together with thresholds and decay interval.
peer_score: Option<(PeerScore, PeerScoreThresholds, Ticker)>,
/// Counts the number of `IHAVE` received from each peer since the last heartbeat.
count_received_ihave: HashMap<PeerId, usize>,
@@ -331,6 +339,9 @@ pub struct Behaviour<D = IdentityTransform, F = AllowAllSubscriptionFilter> {
/// Tracks the numbers of failed messages per peer-id.
failed_messages: HashMap<PeerId, FailedMessages>,
/// Tracks recently sent `IWANT` messages and checks if peers respond to them.
gossip_promises: GossipPromises,
}
impl<D, F> Behaviour<D, F>
@@ -467,6 +478,7 @@ where
subscription_filter,
data_transform,
failed_messages: Default::default(),
gossip_promises: Default::default(),
})
}
}
@@ -919,7 +931,7 @@ where
let interval = Ticker::new(params.decay_interval);
let peer_score = PeerScore::new_with_message_delivery_time_callback(params, callback);
self.peer_score = Some((peer_score, threshold, interval, GossipPromises::default()));
self.peer_score = Some((peer_score, threshold, interval));
Ok(())
}
@@ -1187,7 +1199,7 @@ where
}
fn score_below_threshold_from_scores(
peer_score: &Option<(PeerScore, PeerScoreThresholds, Ticker, GossipPromises)>,
peer_score: &Option<(PeerScore, PeerScoreThresholds, Ticker)>,
peer_id: &PeerId,
threshold: impl Fn(&PeerScoreThresholds) -> f64,
) -> (bool, f64) {
@@ -1248,10 +1260,7 @@ where
return false;
}
self.peer_score
.as_ref()
.map(|(_, _, _, promises)| !promises.contains(id))
.unwrap_or(true)
!self.gossip_promises.contains(id)
};
for (topic, ids) in ihave_msgs {
@@ -1298,13 +1307,11 @@ where
iwant_ids_vec.truncate(iask);
*iasked += iask;
if let Some((_, _, _, gossip_promises)) = &mut self.peer_score {
gossip_promises.add_promise(
*peer_id,
&iwant_ids_vec,
Instant::now() + self.config.iwant_followup_time(),
);
}
self.gossip_promises.add_promise(
*peer_id,
&iwant_ids_vec,
Instant::now() + self.config.iwant_followup_time(),
);
if let Some(peer) = &mut self.connected_peers.get_mut(peer_id) {
tracing::trace!(
@@ -1369,6 +1376,11 @@ where
"IWANT: Peer has asked for message too many times; ignoring request"
);
} else if let Some(peer) = &mut self.connected_peers.get_mut(peer_id) {
if peer.dont_send.get(&id).is_some() {
tracing::debug!(%peer_id, message=%id, "Peer already sent IDONTWANT for this message");
continue;
}
tracing::debug!(peer=%peer_id, "IWANT: Sending cached messages to peer");
if peer
.sender
@@ -1706,14 +1718,15 @@ where
peer=%propagation_source,
"Rejecting message from blacklisted peer"
);
if let Some((peer_score, .., gossip_promises)) = &mut self.peer_score {
self.gossip_promises
.reject_message(msg_id, &RejectReason::BlackListedPeer);
if let Some((peer_score, ..)) = &mut self.peer_score {
peer_score.reject_message(
propagation_source,
msg_id,
&raw_message.topic,
RejectReason::BlackListedPeer,
);
gossip_promises.reject_message(msg_id, &RejectReason::BlackListedPeer);
}
return false;
}
@@ -1795,6 +1808,9 @@ where
// Calculate the message id on the transformed data.
let msg_id = self.config.message_id(&message);
// Broadcast IDONTWANT messages.
self.send_idontwant(&raw_message, &msg_id, propagation_source);
// Check the validity of the message
// Peers get penalized if this message is invalid. We don't add it to the duplicate cache
// and instead continually penalize peers that repeatedly send this message.
@@ -1820,11 +1836,12 @@ where
metrics.msg_recvd(&message.topic);
}
// Tells score that message arrived (but is maybe not fully validated yet).
// Consider the message as delivered for gossip promises.
if let Some((peer_score, .., gossip_promises)) = &mut self.peer_score {
self.gossip_promises.message_delivered(&msg_id);
// Tells score that message arrived (but is maybe not fully validated yet).
if let Some((peer_score, ..)) = &mut self.peer_score {
peer_score.validate_message(propagation_source, &msg_id, &message.topic);
gossip_promises.message_delivered(&msg_id);
}
// Add the message to our memcache
@@ -1871,7 +1888,7 @@ where
raw_message: &RawMessage,
reject_reason: RejectReason,
) {
if let Some((peer_score, .., gossip_promises)) = &mut self.peer_score {
if let Some((peer_score, ..)) = &mut self.peer_score {
if let Some(metrics) = self.metrics.as_mut() {
metrics.register_invalid_message(&raw_message.topic);
}
@@ -1886,7 +1903,8 @@ where
reject_reason,
);
gossip_promises.reject_message(&message_id, &reject_reason);
self.gossip_promises
.reject_message(&message_id, &reject_reason);
} else {
// The message is invalid, we reject it ignoring any gossip promises. If a peer is
// advertising this message via an IHAVE and it's invalid it will be double
@@ -1959,7 +1977,7 @@ where
}
// if the mesh needs peers add the peer to the mesh
if !self.explicit_peers.contains(propagation_source)
&& matches!(peer.kind, PeerKind::Gossipsubv1_1 | PeerKind::Gossipsub)
&& peer.kind.is_gossipsub()
&& !Self::score_below_threshold_from_scores(
&self.peer_score,
propagation_source,
@@ -2066,8 +2084,8 @@ where
/// Applies penalties to peers that did not respond to our IWANT requests.
fn apply_iwant_penalties(&mut self) {
if let Some((peer_score, .., gossip_promises)) = &mut self.peer_score {
for (peer, count) in gossip_promises.get_broken_promises() {
if let Some((peer_score, ..)) = &mut self.peer_score {
for (peer, count) in self.gossip_promises.get_broken_promises() {
peer_score.add_penalty(&peer, count);
if let Some(metrics) = self.metrics.as_mut() {
metrics.register_score_penalty(Penalty::BrokenPromise);
@@ -2288,7 +2306,7 @@ where
&& peers.len() > 1
&& self.peer_score.is_some()
{
if let Some((_, thresholds, _, _)) = &self.peer_score {
if let Some((_, thresholds, _)) = &self.peer_score {
// Opportunistic grafting works as follows: we check the median score of peers
// in the mesh; if this score is below the opportunisticGraftThreshold, we
// select a few peers at random with score over the median.
@@ -2381,7 +2399,7 @@ where
for (topic_hash, peers) in self.fanout.iter_mut() {
let mut to_remove_peers = Vec::new();
let publish_threshold = match &self.peer_score {
Some((_, thresholds, _, _)) => thresholds.publish_threshold,
Some((_, thresholds, _)) => thresholds.publish_threshold,
_ => 0.0,
};
for peer_id in peers.iter() {
@@ -2474,6 +2492,17 @@ where
}
self.failed_messages.shrink_to_fit();
// Flush stale IDONTWANTs.
for peer in self.connected_peers.values_mut() {
while let Some((_front, instant)) = peer.dont_send.front() {
if (*instant + IDONTWANT_TIMEOUT) >= Instant::now() {
break;
} else {
peer.dont_send.pop_front();
}
}
}
tracing::debug!("Completed Heartbeat");
if let Some(metrics) = self.metrics.as_mut() {
let duration = u64::try_from(start.elapsed().as_millis()).unwrap_or(u64::MAX);
@@ -2655,6 +2684,59 @@ where
}
}
/// Helper function which sends an IDONTWANT message to mesh\[topic\] peers.
fn send_idontwant(
&mut self,
message: &RawMessage,
msg_id: &MessageId,
propagation_source: &PeerId,
) {
let Some(mesh_peers) = self.mesh.get(&message.topic) else {
return;
};
let iwant_peers = self.gossip_promises.peers_for_message(msg_id);
let recipient_peers = mesh_peers
.iter()
.chain(iwant_peers.iter())
.filter(|peer_id| {
*peer_id != propagation_source && Some(*peer_id) != message.source.as_ref()
});
for peer_id in recipient_peers {
let Some(peer) = self.connected_peers.get_mut(peer_id) else {
tracing::error!(peer = %peer_id,
"Could not IDONTWANT, peer doesn't exist in connected peer list");
continue;
};
// Only gossipsub 1.2 peers support IDONTWANT.
if peer.kind != PeerKind::Gossipsubv1_2_beta {
continue;
}
if peer
.sender
.idontwant(IDontWant {
message_ids: vec![msg_id.clone()],
})
.is_err()
{
tracing::warn!(peer=%peer_id, "Send Queue full. Could not send IDONTWANT");
if let Some((peer_score, ..)) = &mut self.peer_score {
peer_score.failed_message_slow_peer(peer_id);
}
// Increment failed message count
self.failed_messages
.entry(*peer_id)
.or_default()
.non_priority += 1;
}
}
}
/// Helper function which forwards a message to mesh\[topic\] peers.
///
/// Returns true if at least one peer was messaged.
@@ -2708,6 +2790,11 @@ where
if !recipient_peers.is_empty() {
for peer_id in recipient_peers.iter() {
if let Some(peer) = self.connected_peers.get_mut(peer_id) {
if peer.dont_send.get(msg_id).is_some() {
tracing::debug!(%peer_id, message=%msg_id, "Peer doesn't want message");
continue;
}
tracing::debug!(%peer_id, message=%msg_id, "Sending message to peer");
if peer
.sender
@@ -3057,6 +3144,7 @@ where
connections: vec![],
sender: RpcSender::new(self.config.connection_handler_queue_len()),
topics: Default::default(),
dont_send: LinkedHashMap::new(),
});
// Add the new connection
connected_peer.connections.push(connection_id);
@@ -3087,6 +3175,7 @@ where
connections: vec![],
sender: RpcSender::new(self.config.connection_handler_queue_len()),
topics: Default::default(),
dont_send: LinkedHashMap::new(),
});
// Add the new connection
connected_peer.connections.push(connection_id);
@@ -3136,7 +3225,7 @@ where
}
HandlerEvent::MessageDropped(rpc) => {
// Account for this in the scoring logic
if let Some((peer_score, _, _, _)) = &mut self.peer_score {
if let Some((peer_score, _, _)) = &mut self.peer_score {
peer_score.failed_message_slow_peer(&propagation_source);
}
@@ -3245,6 +3334,24 @@ where
peers,
backoff,
}) => prune_msgs.push((topic_hash, peers, backoff)),
ControlAction::IDontWant(IDontWant { message_ids }) => {
let Some(peer) = self.connected_peers.get_mut(&propagation_source)
else {
tracing::error!(peer = %propagation_source,
"Could not handle IDONTWANT, peer doesn't exist in connected peer list");
continue;
};
if let Some(metrics) = self.metrics.as_mut() {
metrics.register_idontwant(message_ids.len());
}
for message_id in message_ids {
peer.dont_send.insert(message_id, Instant::now());
// Don't exceed capacity.
if peer.dont_send.len() > IDONTWANT_CAP {
peer.dont_send.pop_front();
}
}
}
}
}
if !ihave_msgs.is_empty() {
@@ -3270,7 +3377,7 @@ where
}
// update scores
if let Some((peer_score, _, interval, _)) = &mut self.peer_score {
if let Some((peer_score, _, interval)) = &mut self.peer_score {
while let Poll::Ready(Some(_)) = interval.poll_next_unpin(cx) {
peer_score.refresh_scores();
}
@@ -3395,7 +3502,7 @@ fn get_random_peers_dynamic(
.iter()
.filter(|(_, p)| p.topics.contains(topic_hash))
.filter(|(peer_id, _)| f(peer_id))
.filter(|(_, p)| p.kind == PeerKind::Gossipsub || p.kind == PeerKind::Gossipsubv1_1)
.filter(|(_, p)| p.kind.is_gossipsub())
.map(|(peer_id, _)| *peer_id)
.collect::<Vec<PeerId>>();

View File

@@ -31,13 +31,7 @@ use std::net::Ipv4Addr;
use std::thread::sleep;
#[derive(Default, Debug)]
struct InjectNodes<D, F>
// TODO: remove trait bound Default when this issue is fixed:
// https://github.com/colin-kiegel/rust-derive-builder/issues/93
where
D: DataTransform + Default + Clone + Send + 'static,
F: TopicSubscriptionFilter + Clone + Default + Send + 'static,
{
struct InjectNodes<D, F> {
peer_no: usize,
topics: Vec<String>,
to_subscribe: bool,
@@ -47,6 +41,7 @@ where
scoring: Option<(PeerScoreParams, PeerScoreThresholds)>,
data_transform: D,
subscription_filter: F,
peer_kind: Option<PeerKind>,
}
impl<D, F> InjectNodes<D, F>
@@ -94,7 +89,7 @@ where
let empty = vec![];
for i in 0..self.peer_no {
let (peer, receiver) = add_peer(
let (peer, receiver) = add_peer_with_addr_and_kind(
&mut gs,
if self.to_subscribe {
&topic_hashes
@@ -103,6 +98,8 @@ where
},
i < self.outbound,
i < self.explicit,
Multiaddr::empty(),
self.peer_kind.clone().or(Some(PeerKind::Gossipsubv1_1)),
);
peers.push(peer);
receivers.insert(peer, receiver);
@@ -151,6 +148,11 @@ where
self.subscription_filter = subscription_filter;
self
}
fn peer_kind(mut self, peer_kind: PeerKind) -> Self {
self.peer_kind = Some(peer_kind);
self
}
}
fn inject_nodes<D, F>() -> InjectNodes<D, F>
@@ -235,6 +237,7 @@ where
kind: kind.clone().unwrap_or(PeerKind::Floodsub),
connections: vec![connection_id],
topics: Default::default(),
dont_send: LinkedHashMap::new(),
sender,
},
);
@@ -620,6 +623,7 @@ fn test_join() {
kind: PeerKind::Floodsub,
connections: vec![connection_id],
topics: Default::default(),
dont_send: LinkedHashMap::new(),
sender,
},
);
@@ -1015,6 +1019,7 @@ fn test_get_random_peers() {
connections: vec![ConnectionId::new_unchecked(0)],
topics: topics.clone(),
sender: RpcSender::new(gs.config.connection_handler_queue_len()),
dont_send: LinkedHashMap::new(),
},
);
}
@@ -4580,9 +4585,9 @@ fn test_ignore_too_many_messages_in_ihave() {
let (peer, receiver) = add_peer(&mut gs, &topics, false, false);
receivers.insert(peer, receiver);
//peer has 20 messages
//peer has 30 messages
let mut seq = 0;
let message_ids: Vec<_> = (0..20)
let message_ids: Vec<_> = (0..30)
.map(|_| random_message(&mut seq, &topics))
.map(|msg| gs.data_transform.inbound_transform(msg).unwrap())
.map(|msg| config.message_id(&msg))
@@ -4624,7 +4629,7 @@ fn test_ignore_too_many_messages_in_ihave() {
gs.heartbeat();
gs.handle_ihave(
&peer,
vec![(topics[0].clone(), message_ids[10..20].to_vec())],
vec![(topics[0].clone(), message_ids[20..30].to_vec())],
);
//we sent 10 iwant messages ids via a IWANT rpc.
@@ -5236,3 +5241,191 @@ fn test_graft_without_subscribe() {
// We unsubscribe from the topic.
let _ = gs.unsubscribe(&Topic::new(topic));
}
/// Test that a node sends IDONTWANT messages to the mesh peers
/// that run Gossipsub v1.2.
#[test]
fn sends_idontwant() {
let (mut gs, peers, receivers, topic_hashes) = inject_nodes1()
.peer_no(5)
.topics(vec![String::from("topic1")])
.to_subscribe(true)
.gs_config(Config::default())
.explicit(1)
.peer_kind(PeerKind::Gossipsubv1_2_beta)
.create_network();
let local_id = PeerId::random();
let message = RawMessage {
source: Some(peers[1]),
data: vec![12],
sequence_number: Some(0),
topic: topic_hashes[0].clone(),
signature: None,
key: None,
validated: true,
};
gs.handle_received_message(message.clone(), &local_id);
assert_eq!(
receivers
.into_iter()
.fold(0, |mut idontwants, (peer_id, c)| {
let non_priority = c.non_priority.into_inner();
while !non_priority.is_empty() {
if let Ok(RpcOut::IDontWant(_)) = non_priority.try_recv() {
assert_ne!(peer_id, peers[1]);
idontwants += 1;
}
}
idontwants
}),
3,
"IDONTWANT was not sent"
);
}
/// Test that a node doesn't send IDONTWANT messages to the mesh peers
/// that don't run Gossipsub v1.2.
#[test]
fn doesnt_send_idontwant() {
let (mut gs, peers, receivers, topic_hashes) = inject_nodes1()
.peer_no(5)
.topics(vec![String::from("topic1")])
.to_subscribe(true)
.gs_config(Config::default())
.explicit(1)
.peer_kind(PeerKind::Gossipsubv1_1)
.create_network();
let local_id = PeerId::random();
let message = RawMessage {
source: Some(peers[1]),
data: vec![12],
sequence_number: Some(0),
topic: topic_hashes[0].clone(),
signature: None,
key: None,
validated: true,
};
gs.handle_received_message(message.clone(), &local_id);
assert_eq!(
receivers
.into_iter()
.fold(0, |mut idontwants, (peer_id, c)| {
let non_priority = c.non_priority.into_inner();
while !non_priority.is_empty() {
if matches!(non_priority.try_recv(), Ok(RpcOut::IDontWant(_)) if peer_id != peers[1]) {
idontwants += 1;
}
}
idontwants
}),
0,
"IDONTWANT were sent"
);
}
/// Test that a node doesn't forward a messages to the mesh peers
/// that sent IDONTWANT.
#[test]
fn doesnt_forward_idontwant() {
let (mut gs, peers, receivers, topic_hashes) = inject_nodes1()
.peer_no(4)
.topics(vec![String::from("topic1")])
.to_subscribe(true)
.gs_config(Config::default())
.explicit(1)
.peer_kind(PeerKind::Gossipsubv1_2_beta)
.create_network();
let local_id = PeerId::random();
let raw_message = RawMessage {
source: Some(peers[1]),
data: vec![12],
sequence_number: Some(0),
topic: topic_hashes[0].clone(),
signature: None,
key: None,
validated: true,
};
let message = gs
.data_transform
.inbound_transform(raw_message.clone())
.unwrap();
let message_id = gs.config.message_id(&message);
let peer = gs.connected_peers.get_mut(&peers[2]).unwrap();
peer.dont_send.insert(message_id, Instant::now());
gs.handle_received_message(raw_message.clone(), &local_id);
assert_eq!(
receivers.into_iter().fold(0, |mut fwds, (peer_id, c)| {
let non_priority = c.non_priority.into_inner();
while !non_priority.is_empty() {
if let Ok(RpcOut::Forward { .. }) = non_priority.try_recv() {
assert_ne!(peer_id, peers[2]);
fwds += 1;
}
}
fwds
}),
2,
"IDONTWANT was not sent"
);
}
/// Test that a node parses an
/// IDONTWANT message to the respective peer.
#[test]
fn parses_idontwant() {
let (mut gs, peers, _receivers, _topic_hashes) = inject_nodes1()
.peer_no(2)
.topics(vec![String::from("topic1")])
.to_subscribe(true)
.gs_config(Config::default())
.explicit(1)
.peer_kind(PeerKind::Gossipsubv1_2_beta)
.create_network();
let message_id = MessageId::new(&[0, 1, 2, 3]);
let rpc = Rpc {
messages: vec![],
subscriptions: vec![],
control_msgs: vec![ControlAction::IDontWant(IDontWant {
message_ids: vec![message_id.clone()],
})],
};
gs.on_connection_handler_event(
peers[1],
ConnectionId::new_unchecked(0),
HandlerEvent::Message {
rpc,
invalid_messages: vec![],
},
);
let peer = gs.connected_peers.get_mut(&peers[1]).unwrap();
assert!(peer.dont_send.get(&message_id).is_some());
}
/// Test that a node clears stale IDONTWANT messages.
#[test]
fn clear_stale_idontwant() {
let (mut gs, peers, _receivers, _topic_hashes) = inject_nodes1()
.peer_no(4)
.topics(vec![String::from("topic1")])
.to_subscribe(true)
.gs_config(Config::default())
.explicit(1)
.peer_kind(PeerKind::Gossipsubv1_2_beta)
.create_network();
let peer = gs.connected_peers.get_mut(&peers[2]).unwrap();
peer.dont_send
.insert(MessageId::new(&[1, 2, 3, 4]), Instant::now());
std::thread::sleep(Duration::from_secs(3));
gs.heartbeat();
let peer = gs.connected_peers.get_mut(&peers[2]).unwrap();
assert!(peer.dont_send.is_empty());
}

View File

@@ -154,6 +154,7 @@ pub struct ControlMessage {
pub iwant: Vec<gossipsub::pb::ControlIWant>,
pub graft: Vec<gossipsub::pb::ControlGraft>,
pub prune: Vec<gossipsub::pb::ControlPrune>,
pub idontwant: Vec<gossipsub::pb::ControlIDontWant>,
}
impl<'a> MessageRead<'a> for ControlMessage {
@@ -165,6 +166,7 @@ impl<'a> MessageRead<'a> for ControlMessage {
Ok(18) => msg.iwant.push(r.read_message::<gossipsub::pb::ControlIWant>(bytes)?),
Ok(26) => msg.graft.push(r.read_message::<gossipsub::pb::ControlGraft>(bytes)?),
Ok(34) => msg.prune.push(r.read_message::<gossipsub::pb::ControlPrune>(bytes)?),
Ok(42) => msg.idontwant.push(r.read_message::<gossipsub::pb::ControlIDontWant>(bytes)?),
Ok(t) => { r.read_unknown(bytes, t)?; }
Err(e) => return Err(e),
}
@@ -180,6 +182,7 @@ impl MessageWrite for ControlMessage {
+ self.iwant.iter().map(|s| 1 + sizeof_len((s).get_size())).sum::<usize>()
+ self.graft.iter().map(|s| 1 + sizeof_len((s).get_size())).sum::<usize>()
+ self.prune.iter().map(|s| 1 + sizeof_len((s).get_size())).sum::<usize>()
+ self.idontwant.iter().map(|s| 1 + sizeof_len((s).get_size())).sum::<usize>()
}
fn write_message<W: WriterBackend>(&self, w: &mut Writer<W>) -> Result<()> {
@@ -187,6 +190,7 @@ impl MessageWrite for ControlMessage {
for s in &self.iwant { w.write_with_tag(18, |w| w.write_message(s))?; }
for s in &self.graft { w.write_with_tag(26, |w| w.write_message(s))?; }
for s in &self.prune { w.write_with_tag(34, |w| w.write_message(s))?; }
for s in &self.idontwant { w.write_with_tag(42, |w| w.write_message(s))?; }
Ok(())
}
}
@@ -331,6 +335,38 @@ impl MessageWrite for ControlPrune {
}
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Debug, Default, PartialEq, Clone)]
pub struct ControlIDontWant {
pub message_ids: Vec<Vec<u8>>,
}
impl<'a> MessageRead<'a> for ControlIDontWant {
fn from_reader(r: &mut BytesReader, bytes: &'a [u8]) -> Result<Self> {
let mut msg = Self::default();
while !r.is_eof() {
match r.next_tag(bytes) {
Ok(10) => msg.message_ids.push(r.read_bytes(bytes)?.to_owned()),
Ok(t) => { r.read_unknown(bytes, t)?; }
Err(e) => return Err(e),
}
}
Ok(msg)
}
}
impl MessageWrite for ControlIDontWant {
fn get_size(&self) -> usize {
0
+ self.message_ids.iter().map(|s| 1 + sizeof_len((s).len())).sum::<usize>()
}
fn write_message<W: WriterBackend>(&self, w: &mut Writer<W>) -> Result<()> {
for s in &self.message_ids { w.write_with_tag(10, |w| w.write_bytes(&**s))?; }
Ok(())
}
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Debug, Default, PartialEq, Clone)]
pub struct PeerInfo {

View File

@@ -28,6 +28,7 @@ message ControlMessage {
repeated ControlIWant iwant = 2;
repeated ControlGraft graft = 3;
repeated ControlPrune prune = 4;
repeated ControlIDontWant idontwant = 5;
}
message ControlIHave {
@@ -49,6 +50,10 @@ message ControlPrune {
optional uint64 backoff = 3; // gossipsub v1.1 backoff time (in seconds)
}
message ControlIDontWant {
repeated bytes message_ids = 1;
}
message PeerInfo {
optional bytes peer_id = 1;
optional bytes signed_peer_record = 2;

View File

@@ -41,6 +41,14 @@ impl GossipPromises {
self.promises.contains_key(message)
}
///Get the peers we sent IWANT the input message id.
pub(crate) fn peers_for_message(&self, message_id: &MessageId) -> Vec<PeerId> {
self.promises
.get(message_id)
.map(|peers| peers.keys().copied().collect())
.unwrap_or_default()
}
/// Track a promise to deliver a message from a list of [`MessageId`]s we are requesting.
pub(crate) fn add_promise(&mut self, peer: PeerId, messages: &[MessageId], expires: Instant) {
for message_id in messages {

View File

@@ -179,6 +179,12 @@ pub(crate) struct Metrics {
/// topic. A very high metric might indicate an underperforming network.
topic_iwant_msgs: Family<TopicHash, Counter>,
/// The number of times we have received an IDONTWANT control message.
idontwant_msgs: Counter,
/// The number of msg_id's we have received in every IDONTWANT control message.
idontwant_msgs_ids: Counter,
/// The size of the priority queue.
priority_queue_size: Histogram,
/// The size of the non-priority queue.
@@ -311,6 +317,27 @@ impl Metrics {
"topic_iwant_msgs",
"Number of times we have decided an IWANT is required for this topic"
);
let idontwant_msgs = {
let metric = Counter::default();
registry.register(
"idontwant_msgs",
"The number of times we have received an IDONTWANT control message",
metric.clone(),
);
metric
};
let idontwant_msgs_ids = {
let metric = Counter::default();
registry.register(
"idontwant_msgs_ids",
"The number of msg_id's we have received in every IDONTWANT control message.",
metric.clone(),
);
metric
};
let memcache_misses = {
let metric = Counter::default();
registry.register(
@@ -362,6 +389,8 @@ impl Metrics {
heartbeat_duration,
memcache_misses,
topic_iwant_msgs,
idontwant_msgs,
idontwant_msgs_ids,
priority_queue_size,
non_priority_queue_size,
}
@@ -560,6 +589,12 @@ impl Metrics {
}
}
/// Register receiving an IDONTWANT msg for this topic.
pub(crate) fn register_idontwant(&mut self, msgs: usize) {
self.idontwant_msgs.inc();
self.idontwant_msgs_ids.inc_by(msgs as u64);
}
/// Observes a heartbeat duration.
pub(crate) fn observe_heartbeat_duration(&mut self, millis: u64) {
self.heartbeat_duration.observe(millis as f64);

View File

@@ -23,8 +23,8 @@ use super::handler::HandlerEvent;
use super::rpc_proto::proto;
use super::topic::TopicHash;
use super::types::{
ControlAction, Graft, IHave, IWant, MessageId, PeerInfo, PeerKind, Prune, RawMessage, Rpc,
Subscription, SubscriptionAction,
ControlAction, Graft, IDontWant, IHave, IWant, MessageId, PeerInfo, PeerKind, Prune,
RawMessage, Rpc, Subscription, SubscriptionAction,
};
use super::ValidationError;
use asynchronous_codec::{Decoder, Encoder, Framed};
@@ -40,6 +40,10 @@ use void::Void;
pub(crate) const SIGNING_PREFIX: &[u8] = b"libp2p-pubsub:";
pub(crate) const GOSSIPSUB_1_2_0_BETA_PROTOCOL: ProtocolId = ProtocolId {
protocol: StreamProtocol::new("/meshsub/1.2.0"),
kind: PeerKind::Gossipsubv1_2_beta,
};
pub(crate) const GOSSIPSUB_1_1_0_PROTOCOL: ProtocolId = ProtocolId {
protocol: StreamProtocol::new("/meshsub/1.1.0"),
kind: PeerKind::Gossipsubv1_1,
@@ -69,7 +73,11 @@ impl Default for ProtocolConfig {
Self {
max_transmit_size: 65536,
validation_mode: ValidationMode::Strict,
protocol_ids: vec![GOSSIPSUB_1_1_0_PROTOCOL, GOSSIPSUB_1_0_0_PROTOCOL],
protocol_ids: vec![
GOSSIPSUB_1_2_0_BETA_PROTOCOL,
GOSSIPSUB_1_1_0_PROTOCOL,
GOSSIPSUB_1_0_0_PROTOCOL,
],
}
}
}
@@ -476,10 +484,25 @@ impl Decoder for GossipsubCodec {
}));
}
let idontwant_msgs: Vec<ControlAction> = rpc_control
.idontwant
.into_iter()
.map(|idontwant| {
ControlAction::IDontWant(IDontWant {
message_ids: idontwant
.message_ids
.into_iter()
.map(MessageId::from)
.collect::<Vec<_>>(),
})
})
.collect();
control_msgs.extend(ihave_msgs);
control_msgs.extend(iwant_msgs);
control_msgs.extend(graft_msgs);
control_msgs.extend(prune_msgs);
control_msgs.extend(idontwant_msgs);
}
Ok(Some(HandlerEvent::Message {

View File

@@ -25,6 +25,7 @@ use async_channel::{Receiver, Sender};
use futures::stream::Peekable;
use futures::{Future, Stream, StreamExt};
use futures_timer::Delay;
use hashlink::LinkedHashMap;
use libp2p::identity::PeerId;
use libp2p::swarm::ConnectionId;
use prometheus_client::encoding::EncodeLabelValue;
@@ -34,6 +35,7 @@ use std::fmt::Debug;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::Instant;
use std::{fmt, pin::Pin};
use web_time::Duration;
@@ -121,11 +123,16 @@ pub(crate) struct PeerConnections {
pub(crate) sender: RpcSender,
/// Subscribed topics.
pub(crate) topics: BTreeSet<TopicHash>,
/// Don't send messages.
pub(crate) dont_send: LinkedHashMap<MessageId, Instant>,
}
/// Describes the types of peers that can exist in the gossipsub context.
#[derive(Debug, Clone, PartialEq, Hash, EncodeLabelValue, Eq)]
#[allow(non_camel_case_types)]
pub enum PeerKind {
/// A gossipsub 1.2 peer.
Gossipsubv1_2_beta,
/// A gossipsub 1.1 peer.
Gossipsubv1_1,
/// A gossipsub 1.0 peer.
@@ -136,6 +143,16 @@ pub enum PeerKind {
NotSupported,
}
impl PeerKind {
/// Returns true if peer speaks any gossipsub version.
pub(crate) fn is_gossipsub(&self) -> bool {
matches!(
self,
Self::Gossipsubv1_2_beta | Self::Gossipsubv1_1 | Self::Gossipsub
)
}
}
/// A message received by the gossipsub system and stored locally in caches..
#[derive(Clone, PartialEq, Eq, Hash, Debug)]
pub struct RawMessage {
@@ -257,6 +274,8 @@ pub enum ControlAction {
Graft(Graft),
/// The node has been removed from the mesh - Prune control message.
Prune(Prune),
/// The node requests us to not forward message ids (peer_id + sequence _number) - IDontWant control message.
IDontWant(IDontWant),
}
/// Node broadcasts known messages per topic - IHave control message.
@@ -293,6 +312,13 @@ pub struct Prune {
pub(crate) backoff: Option<u64>,
}
/// The node requests us to not forward message ids - IDontWant control message.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct IDontWant {
/// A list of known message ids.
pub(crate) message_ids: Vec<MessageId>,
}
/// A Gossipsub RPC message sent.
#[derive(Debug)]
pub enum RpcOut {
@@ -314,6 +340,8 @@ pub enum RpcOut {
IHave(IHave),
/// Send a IWant control message.
IWant(IWant),
/// Send a IDontWant control message.
IDontWant(IDontWant),
}
impl RpcOut {
@@ -374,6 +402,7 @@ impl From<RpcOut> for proto::RPC {
iwant: vec![],
graft: vec![],
prune: vec![],
idontwant: vec![],
}),
},
RpcOut::IWant(IWant { message_ids }) => proto::RPC {
@@ -386,6 +415,7 @@ impl From<RpcOut> for proto::RPC {
}],
graft: vec![],
prune: vec![],
idontwant: vec![],
}),
},
RpcOut::Graft(Graft { topic_hash }) => proto::RPC {
@@ -398,6 +428,7 @@ impl From<RpcOut> for proto::RPC {
topic_id: Some(topic_hash.into_string()),
}],
prune: vec![],
idontwant: vec![],
}),
},
RpcOut::Prune(Prune {
@@ -424,9 +455,23 @@ impl From<RpcOut> for proto::RPC {
.collect(),
backoff,
}],
idontwant: vec![],
}),
}
}
RpcOut::IDontWant(IDontWant { message_ids }) => proto::RPC {
publish: Vec::new(),
subscriptions: Vec::new(),
control: Some(proto::ControlMessage {
ihave: vec![],
iwant: vec![],
graft: vec![],
prune: vec![],
idontwant: vec![proto::ControlIDontWant {
message_ids: message_ids.into_iter().map(|msg_id| msg_id.0).collect(),
}],
}),
},
}
}
}
@@ -485,6 +530,7 @@ impl From<Rpc> for proto::RPC {
iwant: Vec::new(),
graft: Vec::new(),
prune: Vec::new(),
idontwant: Vec::new(),
};
let empty_control_msg = rpc.control_msgs.is_empty();
@@ -533,6 +579,12 @@ impl From<Rpc> for proto::RPC {
};
control.prune.push(rpc_prune);
}
ControlAction::IDontWant(IDontWant { message_ids }) => {
let rpc_idontwant = proto::ControlIDontWant {
message_ids: message_ids.into_iter().map(|msg_id| msg_id.0).collect(),
};
control.idontwant.push(rpc_idontwant);
}
}
}
@@ -571,6 +623,7 @@ impl PeerKind {
Self::Floodsub => "Floodsub",
Self::Gossipsub => "Gossipsub v1.0",
Self::Gossipsubv1_1 => "Gossipsub v1.1",
Self::Gossipsubv1_2_beta => "Gossipsub v1.2-beta",
}
}
}
@@ -657,6 +710,15 @@ impl RpcSender {
.map_err(|err| err.into_inner())
}
/// Send a `RpcOut::IWant` message to the `RpcReceiver`
/// this is low priority, if the queue is full an Err is returned.
#[allow(clippy::result_large_err)]
pub(crate) fn idontwant(&mut self, idontwant: IDontWant) -> Result<(), RpcOut> {
self.non_priority_sender
.try_send(RpcOut::IDontWant(idontwant))
.map_err(|err| err.into_inner())
}
/// Send a `RpcOut::Subscribe` message to the `RpcReceiver`
/// this is high priority.
pub(crate) fn subscribe(&mut self, topic: TopicHash) {

View File

@@ -5,6 +5,7 @@ use crate::types::{
};
use crate::{GossipTopic, NetworkConfig};
use futures::future::Either;
use gossipsub;
use libp2p::core::{multiaddr::Multiaddr, muxing::StreamMuxerBox, transport::Boxed};
use libp2p::identity::{secp256k1, Keypair};
use libp2p::{core, noise, yamux, PeerId, Transport};