Add additional metrics for idontwant (#6578)

* Add additional metrics for idontwant

* Resolve issues from review

* Fix tests

* Don't exceed capacity

* Apply suggestions from code review

Co-authored-by: João Oliveira <hello@jxs.pt>

* Return early on failure

* Add comment
This commit is contained in:
Pawan Dhananjay
2024-11-21 03:57:13 +05:30
committed by GitHub
parent b1e9f69460
commit 94311c6516
4 changed files with 86 additions and 18 deletions

View File

@@ -1385,7 +1385,7 @@ where
"IWANT: Peer has asked for message too many times; ignoring request"
);
} else if let Some(peer) = &mut self.connected_peers.get_mut(peer_id) {
if peer.dont_send.get(&id).is_some() {
if peer.dont_send_received.get(&id).is_some() {
tracing::debug!(%peer_id, message=%id, "Peer already sent IDONTWANT for this message");
continue;
}
@@ -1817,6 +1817,15 @@ where
// Calculate the message id on the transformed data.
let msg_id = self.config.message_id(&message);
if let Some(metrics) = self.metrics.as_mut() {
if let Some(peer) = self.connected_peers.get_mut(propagation_source) {
// Record if we received a message that we already sent a IDONTWANT for to the peer
if peer.dont_send_sent.contains_key(&msg_id) {
metrics.register_idontwant_messages_ignored_per_topic(&raw_message.topic);
}
}
}
// Check the validity of the message
// Peers get penalized if this message is invalid. We don't add it to the duplicate cache
// and instead continually penalize peers that repeatedly send this message.
@@ -2512,11 +2521,19 @@ where
// Flush stale IDONTWANTs.
for peer in self.connected_peers.values_mut() {
while let Some((_front, instant)) = peer.dont_send.front() {
while let Some((_front, instant)) = peer.dont_send_received.front() {
if (*instant + IDONTWANT_TIMEOUT) >= Instant::now() {
break;
} else {
peer.dont_send.pop_front();
peer.dont_send_received.pop_front();
}
}
// If metrics are not enabled, this queue would be empty.
while let Some((_front, instant)) = peer.dont_send_sent.front() {
if (*instant + IDONTWANT_TIMEOUT) >= Instant::now() {
break;
} else {
peer.dont_send_sent.pop_front();
}
}
}
@@ -2751,6 +2768,16 @@ where
.entry(*peer_id)
.or_default()
.non_priority += 1;
return;
}
// IDONTWANT sent successfully.
if let Some(metrics) = self.metrics.as_mut() {
peer.dont_send_sent.insert(msg_id.clone(), Instant::now());
// Don't exceed capacity.
if peer.dont_send_sent.len() > IDONTWANT_CAP {
peer.dont_send_sent.pop_front();
}
metrics.register_idontwant_messages_sent_per_topic(&message.topic);
}
}
}
@@ -2808,7 +2835,7 @@ where
if !recipient_peers.is_empty() {
for peer_id in recipient_peers.iter() {
if let Some(peer) = self.connected_peers.get_mut(peer_id) {
if peer.dont_send.get(msg_id).is_some() {
if peer.dont_send_received.get(msg_id).is_some() {
tracing::debug!(%peer_id, message=%msg_id, "Peer doesn't want message");
continue;
}
@@ -3162,7 +3189,8 @@ where
connections: vec![],
sender: RpcSender::new(self.config.connection_handler_queue_len()),
topics: Default::default(),
dont_send: LinkedHashMap::new(),
dont_send_received: LinkedHashMap::new(),
dont_send_sent: LinkedHashMap::new(),
});
// Add the new connection
connected_peer.connections.push(connection_id);
@@ -3194,7 +3222,8 @@ where
connections: vec![],
sender: RpcSender::new(self.config.connection_handler_queue_len()),
topics: Default::default(),
dont_send: LinkedHashMap::new(),
dont_send_received: LinkedHashMap::new(),
dont_send_sent: LinkedHashMap::new(),
});
// Add the new connection
connected_peer.connections.push(connection_id);
@@ -3366,10 +3395,10 @@ where
metrics.register_idontwant_bytes(idontwant_size);
}
for message_id in message_ids {
peer.dont_send.insert(message_id, Instant::now());
peer.dont_send_received.insert(message_id, Instant::now());
// Don't exceed capacity.
if peer.dont_send.len() > IDONTWANT_CAP {
peer.dont_send.pop_front();
if peer.dont_send_received.len() > IDONTWANT_CAP {
peer.dont_send_received.pop_front();
}
}
}