Subscribe to PeerDAS topics on Fulu fork (#6849)

`TODO(das)` now that PeerDAS is scheduled in a hard fork we can subscribe to its topics on the fork activation. In current stable we subscribe to PeerDAS topics as soon as the node starts if PeerDAS is scheduled.

This PR adds another todo to unsubscribe to blob topics at the fork. This other PR included solution for that, but I can include it in a separate PR
- https://github.com/sigp/lighthouse/pull/5899/files


  Include PeerDAS topics as part of Fulu fork in `fork_core_topics`.
This commit is contained in:
Lion - dapplion
2025-02-03 03:07:39 -03:00
committed by GitHub
parent a088b0b6c4
commit 55d1e754b4
5 changed files with 62 additions and 50 deletions

View File

@@ -33,8 +33,8 @@ use task_executor::ShutdownReason;
use tokio::sync::mpsc;
use tokio::time::Sleep;
use types::{
ChainSpec, DataColumnSubnetId, EthSpec, ForkContext, Slot, SubnetId, SyncCommitteeSubscription,
SyncSubnetId, Unsigned, ValidatorSubscription,
ChainSpec, EthSpec, ForkContext, Slot, SubnetId, SyncCommitteeSubscription, SyncSubnetId,
Unsigned, ValidatorSubscription,
};
mod tests;
@@ -181,8 +181,6 @@ pub struct NetworkService<T: BeaconChainTypes> {
next_fork_subscriptions: Pin<Box<OptionFuture<Sleep>>>,
/// A delay that expires when we need to unsubscribe from old fork topics.
next_unsubscribe: Pin<Box<OptionFuture<Sleep>>>,
/// Subscribe to all the data column subnets.
subscribe_all_data_column_subnets: bool,
/// Subscribe to all the subnets once synced.
subscribe_all_subnets: bool,
/// Shutdown beacon node after sync is complete.
@@ -349,7 +347,6 @@ impl<T: BeaconChainTypes> NetworkService<T> {
next_fork_update,
next_fork_subscriptions,
next_unsubscribe,
subscribe_all_data_column_subnets: config.subscribe_all_data_column_subnets,
subscribe_all_subnets: config.subscribe_all_subnets,
shutdown_after_sync: config.shutdown_after_sync,
metrics_enabled: config.metrics_enabled,
@@ -717,6 +714,7 @@ impl<T: BeaconChainTypes> NetworkService<T> {
for topic_kind in core_topics_to_subscribe::<T::EthSpec>(
self.fork_context.current_fork(),
&self.fork_context.spec,
&self.network_globals.as_topic_config(),
) {
for fork_digest in self.required_gossip_fork_digests() {
let topic = GossipTopic::new(
@@ -751,10 +749,6 @@ impl<T: BeaconChainTypes> NetworkService<T> {
}
}
if self.fork_context.spec.is_peer_das_scheduled() {
self.subscribe_to_peer_das_topics(&mut subscribed_topics);
}
// If we are to subscribe to all subnets we do it here
if self.subscribe_all_subnets {
for subnet_id in 0..<<T as BeaconChainTypes>::EthSpec as EthSpec>::SubnetBitfieldLength::to_u64() {
@@ -801,37 +795,6 @@ impl<T: BeaconChainTypes> NetworkService<T> {
}
}
/// Keeping these separate from core topics because it has custom logic:
/// 1. Data column subscription logic depends on subscription configuration.
/// 2. Data column topic subscriptions will be dynamic based on validator balances due to
/// validator custody.
///
/// TODO(das): The downside with not including it in core fork topic is - we subscribe to
/// PeerDAS topics on startup if Fulu is scheduled, rather than waiting until the fork.
/// If this is an issue we could potentially consider adding the logic to
/// `network.subscribe_new_fork_topics()`.
fn subscribe_to_peer_das_topics(&mut self, subscribed_topics: &mut Vec<GossipTopic>) {
let column_subnets_to_subscribe = if self.subscribe_all_data_column_subnets {
&(0..self.fork_context.spec.data_column_sidecar_subnet_count)
.map(DataColumnSubnetId::new)
.collect()
} else {
&self.network_globals.sampling_subnets
};
for column_subnet in column_subnets_to_subscribe.iter() {
for fork_digest in self.required_gossip_fork_digests() {
let gossip_kind = Subnet::DataColumn(*column_subnet).into();
let topic = GossipTopic::new(gossip_kind, GossipEncoding::default(), fork_digest);
if self.libp2p.subscribe(topic.clone()) {
subscribed_topics.push(topic);
} else {
warn!(self.log, "Could not subscribe to topic"; "topic" => %topic);
}
}
}
}
/// Handle a message sent to the network service.
async fn on_validator_subscription_msg(&mut self, msg: ValidatorSubscriptionMessage) {
match msg {
@@ -947,6 +910,7 @@ impl<T: BeaconChainTypes> NetworkService<T> {
let core_topics = core_topics_to_subscribe::<T::EthSpec>(
self.fork_context.current_fork(),
&self.fork_context.spec,
&self.network_globals.as_topic_config(),
);
let core_topics: HashSet<&GossipKind> = HashSet::from_iter(&core_topics);
let subscriptions = self.network_globals.gossipsub_subscriptions.read();