Unsubscribe blob topics at Fulu fork (#6932)

Addresses #6854.

PeerDAS requires unsubscribing a Gossip topic at a fork boundary. This is not possible with our current topic machinery.


  Instead of defining which topics have to be **added** at a given fork, we define the complete set of topics at a given fork. The new start of the show and key function is:

```rust
pub fn core_topics_to_subscribe<E: EthSpec>(
fork_name: ForkName,
opts: &TopicConfig,
spec: &ChainSpec,
) -> Vec<GossipKind> {
// ...
if fork_name.deneb_enabled() && !fork_name.fulu_enabled() {
// All of deneb blob topics are core topics
for i in 0..spec.blob_sidecar_subnet_count(fork_name) {
topics.push(GossipKind::BlobSidecar(i));
}
}
// ...
}
```

`core_topics_to_subscribe` only returns the blob topics if `fork < Fulu`. Then at the fork boundary, we subscribe with the new fork digest to `core_topics_to_subscribe(next_fork)`, which excludes the blob topics.

I added `is_fork_non_core_topic` to carry on to the next fork the aggregator topics for attestations and sync committee messages. This approach is future-proof if those topics ever become fork-dependent.

Closes https://github.com/sigp/lighthouse/issues/6854
This commit is contained in:
Lion - dapplion
2025-02-11 20:40:14 -03:00
committed by GitHub
parent 431dd7c398
commit 0055af56b6
6 changed files with 238 additions and 206 deletions

View File

@@ -181,8 +181,6 @@ pub struct NetworkService<T: BeaconChainTypes> {
next_fork_subscriptions: Pin<Box<OptionFuture<Sleep>>>,
/// A delay that expires when we need to unsubscribe from old fork topics.
next_unsubscribe: Pin<Box<OptionFuture<Sleep>>>,
/// Subscribe to all the subnets once synced.
subscribe_all_subnets: bool,
/// Shutdown beacon node after sync is complete.
shutdown_after_sync: bool,
/// Whether metrics are enabled or not.
@@ -191,8 +189,6 @@ pub struct NetworkService<T: BeaconChainTypes> {
metrics_update: tokio::time::Interval,
/// gossipsub_parameter_update timer
gossipsub_parameter_update: tokio::time::Interval,
/// enable_light_client_server indicator
enable_light_client_server: bool,
/// The logger for the network service.
fork_context: Arc<ForkContext>,
log: slog::Logger,
@@ -347,14 +343,12 @@ impl<T: BeaconChainTypes> NetworkService<T> {
next_fork_update,
next_fork_subscriptions,
next_unsubscribe,
subscribe_all_subnets: config.subscribe_all_subnets,
shutdown_after_sync: config.shutdown_after_sync,
metrics_enabled: config.metrics_enabled,
metrics_update,
gossipsub_parameter_update,
fork_context,
log: network_log,
enable_light_client_server: config.enable_light_client_server,
};
Ok((network_service, network_globals, network_senders))
@@ -713,8 +707,8 @@ impl<T: BeaconChainTypes> NetworkService<T> {
let mut subscribed_topics: Vec<GossipTopic> = vec![];
for topic_kind in core_topics_to_subscribe::<T::EthSpec>(
self.fork_context.current_fork(),
&self.fork_context.spec,
&self.network_globals.as_topic_config(),
&self.fork_context.spec,
) {
for fork_digest in self.required_gossip_fork_digests() {
let topic = GossipTopic::new(
@@ -730,57 +724,18 @@ impl<T: BeaconChainTypes> NetworkService<T> {
}
}
if self.enable_light_client_server {
for light_client_topic_kind in
lighthouse_network::types::LIGHT_CLIENT_GOSSIP_TOPICS.iter()
{
for fork_digest in self.required_gossip_fork_digests() {
let light_client_topic = GossipTopic::new(
light_client_topic_kind.clone(),
GossipEncoding::default(),
fork_digest,
);
if self.libp2p.subscribe(light_client_topic.clone()) {
subscribed_topics.push(light_client_topic);
} else {
warn!(self.log, "Could not subscribe to topic"; "topic" => %light_client_topic);
}
}
}
}
// If we are to subscribe to all subnets we do it here
if self.subscribe_all_subnets {
if self.network_globals.config.subscribe_all_subnets {
for subnet_id in 0..<<T as BeaconChainTypes>::EthSpec as EthSpec>::SubnetBitfieldLength::to_u64() {
let subnet = Subnet::Attestation(SubnetId::new(subnet_id));
// Update the ENR bitfield
self.libp2p.update_enr_subnet(subnet, true);
for fork_digest in self.required_gossip_fork_digests() {
let topic = GossipTopic::new(subnet.into(), GossipEncoding::default(), fork_digest);
if self.libp2p.subscribe(topic.clone()) {
subscribed_topics.push(topic);
} else {
warn!(self.log, "Could not subscribe to topic"; "topic" => %topic);
}
}
}
let subnet_max = <<T as BeaconChainTypes>::EthSpec as EthSpec>::SyncCommitteeSubnetCount::to_u64();
for subnet_id in 0..subnet_max {
let subnet = Subnet::SyncCommittee(SyncSubnetId::new(subnet_id));
// Update the ENR bitfield
self.libp2p.update_enr_subnet(subnet, true);
for fork_digest in self.required_gossip_fork_digests() {
let topic = GossipTopic::new(
subnet.into(),
GossipEncoding::default(),
fork_digest,
);
if self.libp2p.subscribe(topic.clone()) {
subscribed_topics.push(topic);
} else {
warn!(self.log, "Could not subscribe to topic"; "topic" => %topic);
}
}
}
}
@@ -909,8 +864,8 @@ impl<T: BeaconChainTypes> NetworkService<T> {
fn subscribed_core_topics(&self) -> bool {
let core_topics = core_topics_to_subscribe::<T::EthSpec>(
self.fork_context.current_fork(),
&self.fork_context.spec,
&self.network_globals.as_topic_config(),
&self.fork_context.spec,
);
let core_topics: HashSet<&GossipKind> = HashSet::from_iter(&core_topics);
let subscriptions = self.network_globals.gossipsub_subscriptions.read();