Update libp2p (#9331)

Update libp2p to benefit from recent improvements, including partial messages bugfixes.


  


Co-Authored-By: Daniel Knopik <daniel@dknopik.de>
This commit is contained in:
Daniel Knopik
2026-06-05 16:50:19 +02:00
committed by GitHub
parent 6698872f8a
commit e78e1d38ba
7 changed files with 549 additions and 223 deletions

View File

@@ -508,7 +508,9 @@ pub fn gossipsub_config(
.fanout_ttl(Duration::from_secs(60))
.history_length(12)
.flood_publish(false)
.max_messages_per_rpc(Some(500)) // Responses to IWANT can be quite large
.max_publish_messages(500) // Responses to IWANT can be quite large
.max_control_messages_sent(500)
.max_control_message_size(128 << 10) // 128KB
.history_gossip(load.history_gossip)
.validate_messages() // require validation before propagation
.validation_mode(gossipsub::ValidationMode::Anonymous)

View File

@@ -311,11 +311,8 @@ impl<E: EthSpec> Network<E> {
let fork = ctx.chain_spec.fork_name_at_epoch(epoch);
all_topics_at_fork::<E>(fork, &ctx.chain_spec)
.into_iter()
.map(|topic| {
Topic::new(GossipTopic::new(topic, GossipEncoding::default(), digest))
.into()
})
.collect::<Vec<TopicHash>>()
.map(|topic| GossipTopic::new(topic, GossipEncoding::default(), digest))
.collect::<Vec<_>>()
})
.collect::<Vec<_>>();
@@ -368,11 +365,20 @@ impl<E: EthSpec> Network<E> {
gossipsub.add_explicit_peer(&PeerId::from(explicit_peer.clone()));
}
// Register topics with enabled partial messages
for topic in all_topics_for_digests.iter().flatten() {
if topic.kind().use_partial_messages(&config) {
gossipsub.enable_partials_for_topic(Topic::new(topic.clone()).hash(), true);
}
}
// If we are using metrics, then register which topics we want to make sure to keep
// track of
if ctx.libp2p_registry.is_some() {
for topics in all_topics_for_digests {
gossipsub.register_topics_for_metrics(topics);
gossipsub.register_topics_for_metrics(
topics.into_iter().map(|t| Topic::new(t).hash()).collect(),
);
}
}
@@ -823,18 +829,9 @@ impl<E: EthSpec> Network<E> {
.write()
.insert(topic.clone());
let partial = topic
.kind()
.use_partial_messages(self.network_globals.config.as_ref());
let topic: Topic = topic.into();
let subscribe_result = if partial {
self.gossipsub_mut().subscribe_partial(&topic, true)
} else {
self.gossipsub_mut().subscribe(&topic)
};
match subscribe_result {
match self.gossipsub_mut().subscribe(&topic) {
Err(e) => {
warn!(%topic, error = ?e, "Failed to subscribe to topic");
false
@@ -1381,9 +1378,9 @@ impl<E: EthSpec> Network<E> {
/* Sub-behaviour event handling functions */
/// Handle a gossipsub event.
fn inject_gs_event(&mut self, event: gossipsub::Event) -> Option<NetworkEvent<E>> {
fn inject_gs_event(&mut self, event: Event) -> Option<NetworkEvent<E>> {
match event {
gossipsub::Event::Message {
Event::Message {
propagation_source,
message_id: id,
message: gs_msg,
@@ -1461,7 +1458,7 @@ impl<E: EthSpec> Network<E> {
}
}
}
gossipsub::Event::Subscribed { peer_id, topic } => {
Event::Subscribed { peer_id, topic, .. } => {
if let Ok(topic) = GossipTopic::decode(topic.as_str()) {
if let Some(subnet_id) = topic.subnet_id() {
self.network_globals
@@ -1513,7 +1510,7 @@ impl<E: EthSpec> Network<E> {
}
}
}
gossipsub::Event::Unsubscribed { peer_id, topic } => {
Event::Unsubscribed { peer_id, topic } => {
if let Some(subnet_id) = subnet_from_topic_hash(&topic) {
self.network_globals
.peers
@@ -1521,7 +1518,7 @@ impl<E: EthSpec> Network<E> {
.remove_subscription(&peer_id, &subnet_id);
}
}
gossipsub::Event::GossipsubNotSupported { peer_id } => {
Event::GossipsubNotSupported { peer_id } => {
debug!(%peer_id, "Peer does not support gossipsub");
self.peer_manager_mut().report_peer(
&peer_id,
@@ -1531,7 +1528,7 @@ impl<E: EthSpec> Network<E> {
"does_not_support_gossipsub",
);
}
gossipsub::Event::SlowPeer {
Event::SlowPeer {
peer_id,
failed_messages,
} => {

View File

@@ -1,7 +1,7 @@
//! Handles the encoding and decoding of pubsub messages.
use crate::types::{GossipEncoding, GossipKind, GossipTopic};
use gossipsub::TopicHash;
use libp2p::gossipsub::{DataTransform, Message, RawMessage, TopicHash};
use snap::raw::{Decoder, Encoder, decompress_len};
use ssz::{Decode, Encode};
use std::io::{Error, ErrorKind};
@@ -73,12 +73,9 @@ impl SnappyTransform {
}
}
impl gossipsub::DataTransform for SnappyTransform {
impl DataTransform for SnappyTransform {
// Provides the snappy decompression from RawGossipsubMessages
fn inbound_transform(
&self,
raw_message: gossipsub::RawMessage,
) -> Result<gossipsub::Message, std::io::Error> {
fn inbound_transform(&self, raw_message: RawMessage) -> Result<Message, std::io::Error> {
// first check the size of the compressed payload
if raw_message.data.len() > self.max_compressed_len {
return Err(Error::new(
@@ -99,7 +96,7 @@ impl gossipsub::DataTransform for SnappyTransform {
let decompressed_data = decoder.decompress_vec(&raw_message.data)?;
// Build the GossipsubMessage struct
Ok(gossipsub::Message {
Ok(Message {
source: raw_message.source,
data: decompressed_data,
sequence_number: raw_message.sequence_number,