Unsubscribe blob topics at Fulu fork (#6932)

Addresses #6854.

PeerDAS requires unsubscribing a Gossip topic at a fork boundary. This is not possible with our current topic machinery.


  Instead of defining which topics have to be **added** at a given fork, we define the complete set of topics at a given fork. The new start of the show and key function is:

```rust
pub fn core_topics_to_subscribe<E: EthSpec>(
fork_name: ForkName,
opts: &TopicConfig,
spec: &ChainSpec,
) -> Vec<GossipKind> {
// ...
if fork_name.deneb_enabled() && !fork_name.fulu_enabled() {
// All of deneb blob topics are core topics
for i in 0..spec.blob_sidecar_subnet_count(fork_name) {
topics.push(GossipKind::BlobSidecar(i));
}
}
// ...
}
```

`core_topics_to_subscribe` only returns the blob topics if `fork < Fulu`. Then at the fork boundary, we subscribe with the new fork digest to `core_topics_to_subscribe(next_fork)`, which excludes the blob topics.

I added `is_fork_non_core_topic` to carry on to the next fork the aggregator topics for attestations and sync committee messages. This approach is future-proof if those topics ever become fork-dependent.

Closes https://github.com/sigp/lighthouse/issues/6854
This commit is contained in:
Lion - dapplion
2025-02-11 20:40:14 -03:00
committed by GitHub
parent 431dd7c398
commit 0055af56b6
6 changed files with 238 additions and 206 deletions

View File

@@ -14,9 +14,8 @@ use crate::rpc::{
RequestType, ResponseTermination, RpcErrorResponse, RpcResponse, RpcSuccessResponse, RPC, RequestType, ResponseTermination, RpcErrorResponse, RpcResponse, RpcSuccessResponse, RPC,
}; };
use crate::types::{ use crate::types::{
attestation_sync_committee_topics, fork_core_topics, subnet_from_topic_hash, GossipEncoding, all_topics_at_fork, core_topics_to_subscribe, is_fork_non_core_topic, subnet_from_topic_hash,
GossipKind, GossipTopic, SnappyTransform, Subnet, SubnetDiscovery, ALTAIR_CORE_TOPICS, GossipEncoding, GossipKind, GossipTopic, SnappyTransform, Subnet, SubnetDiscovery,
BASE_CORE_TOPICS, CAPELLA_CORE_TOPICS, LIGHT_CLIENT_GOSSIP_TOPICS,
}; };
use crate::EnrExt; use crate::EnrExt;
use crate::Eth2Enr; use crate::Eth2Enr;
@@ -280,14 +279,39 @@ impl<E: EthSpec> Network<E> {
// Set up a scoring update interval // Set up a scoring update interval
let update_gossipsub_scores = tokio::time::interval(params.decay_interval); let update_gossipsub_scores = tokio::time::interval(params.decay_interval);
let max_topics = ctx.chain_spec.attestation_subnet_count as usize let current_and_future_forks = ForkName::list_all().into_iter().filter_map(|fork| {
+ SYNC_COMMITTEE_SUBNET_COUNT as usize if fork >= ctx.fork_context.current_fork() {
+ ctx.chain_spec.blob_sidecar_subnet_count_max() as usize ctx.fork_context
+ ctx.chain_spec.data_column_sidecar_subnet_count as usize .to_context_bytes(fork)
+ BASE_CORE_TOPICS.len() .map(|fork_digest| (fork, fork_digest))
+ ALTAIR_CORE_TOPICS.len() } else {
+ CAPELLA_CORE_TOPICS.len() // 0 core deneb and electra topics None
+ LIGHT_CLIENT_GOSSIP_TOPICS.len(); }
});
let all_topics_for_forks = current_and_future_forks
.map(|(fork, fork_digest)| {
all_topics_at_fork::<E>(fork, &ctx.chain_spec)
.into_iter()
.map(|topic| {
Topic::new(GossipTopic::new(
topic,
GossipEncoding::default(),
fork_digest,
))
.into()
})
.collect::<Vec<TopicHash>>()
})
.collect::<Vec<_>>();
// For simplicity find the fork with the most individual topics and assume all forks
// have the same topic count
let max_topics_at_any_fork = all_topics_for_forks
.iter()
.map(|topics| topics.len())
.max()
.expect("each fork has at least 5 hardcoded core topics");
let possible_fork_digests = ctx.fork_context.all_fork_digests(); let possible_fork_digests = ctx.fork_context.all_fork_digests();
let filter = gossipsub::MaxCountSubscriptionFilter { let filter = gossipsub::MaxCountSubscriptionFilter {
@@ -297,9 +321,9 @@ impl<E: EthSpec> Network<E> {
SYNC_COMMITTEE_SUBNET_COUNT, SYNC_COMMITTEE_SUBNET_COUNT,
), ),
// during a fork we subscribe to both the old and new topics // during a fork we subscribe to both the old and new topics
max_subscribed_topics: max_topics * 4, max_subscribed_topics: max_topics_at_any_fork * 4,
// 424 in theory = (64 attestation + 4 sync committee + 7 core topics + 9 blob topics + 128 column topics) * 2 // 424 in theory = (64 attestation + 4 sync committee + 7 core topics + 9 blob topics + 128 column topics) * 2
max_subscriptions_per_request: max_topics * 2, max_subscriptions_per_request: max_topics_at_any_fork * 2,
}; };
// If metrics are enabled for libp2p build the configuration // If metrics are enabled for libp2p build the configuration
@@ -332,17 +356,9 @@ impl<E: EthSpec> Network<E> {
// If we are using metrics, then register which topics we want to make sure to keep // If we are using metrics, then register which topics we want to make sure to keep
// track of // track of
if ctx.libp2p_registry.is_some() { if ctx.libp2p_registry.is_some() {
let topics_to_keep_metrics_for = attestation_sync_committee_topics::<E>() for topics in all_topics_for_forks {
.map(|gossip_kind| { gossipsub.register_topics_for_metrics(topics);
Topic::from(GossipTopic::new( }
gossip_kind,
GossipEncoding::default(),
enr_fork_id.fork_digest,
))
.into()
})
.collect::<Vec<TopicHash>>();
gossipsub.register_topics_for_metrics(topics_to_keep_metrics_for);
} }
(gossipsub, update_gossipsub_scores) (gossipsub, update_gossipsub_scores)
@@ -700,38 +716,26 @@ impl<E: EthSpec> Network<E> {
/// Subscribe to all required topics for the `new_fork` with the given `new_fork_digest`. /// Subscribe to all required topics for the `new_fork` with the given `new_fork_digest`.
pub fn subscribe_new_fork_topics(&mut self, new_fork: ForkName, new_fork_digest: [u8; 4]) { pub fn subscribe_new_fork_topics(&mut self, new_fork: ForkName, new_fork_digest: [u8; 4]) {
// Subscribe to existing topics with new fork digest // Re-subscribe to non-core topics with the new fork digest
let subscriptions = self.network_globals.gossipsub_subscriptions.read().clone(); let subscriptions = self.network_globals.gossipsub_subscriptions.read().clone();
for mut topic in subscriptions.into_iter() { for mut topic in subscriptions.into_iter() {
topic.fork_digest = new_fork_digest; if is_fork_non_core_topic(&topic, new_fork) {
self.subscribe(topic); topic.fork_digest = new_fork_digest;
self.subscribe(topic);
}
} }
// Subscribe to core topics for the new fork // Subscribe to core topics for the new fork
for kind in fork_core_topics::<E>( for kind in core_topics_to_subscribe::<E>(
&new_fork, new_fork,
&self.fork_context.spec,
&self.network_globals.as_topic_config(), &self.network_globals.as_topic_config(),
&self.fork_context.spec,
) { ) {
let topic = GossipTopic::new(kind, GossipEncoding::default(), new_fork_digest); let topic = GossipTopic::new(kind, GossipEncoding::default(), new_fork_digest);
self.subscribe(topic); self.subscribe(topic);
} }
// TODO(das): unsubscribe from blob topics at the Fulu fork // Already registered all possible gossipsub topics for metrics
// Register the new topics for metrics
let topics_to_keep_metrics_for = attestation_sync_committee_topics::<E>()
.map(|gossip_kind| {
Topic::from(GossipTopic::new(
gossip_kind,
GossipEncoding::default(),
new_fork_digest,
))
.into()
})
.collect::<Vec<TopicHash>>();
self.gossipsub_mut()
.register_topics_for_metrics(topics_to_keep_metrics_for);
} }
/// Unsubscribe from all topics that doesn't have the given fork_digest /// Unsubscribe from all topics that doesn't have the given fork_digest

View File

@@ -187,6 +187,8 @@ impl<E: EthSpec> NetworkGlobals<E> {
/// Returns the TopicConfig to compute the set of Gossip topics for a given fork /// Returns the TopicConfig to compute the set of Gossip topics for a given fork
pub fn as_topic_config(&self) -> TopicConfig { pub fn as_topic_config(&self) -> TopicConfig {
TopicConfig { TopicConfig {
enable_light_client_server: self.config.enable_light_client_server,
subscribe_all_subnets: self.config.subscribe_all_subnets,
subscribe_all_data_column_subnets: self.config.subscribe_all_data_column_subnets, subscribe_all_data_column_subnets: self.config.subscribe_all_data_column_subnets,
sampling_subnets: &self.sampling_subnets, sampling_subnets: &self.sampling_subnets,
} }

View File

@@ -16,7 +16,6 @@ pub use pubsub::{PubsubMessage, SnappyTransform};
pub use subnet::{Subnet, SubnetDiscovery}; pub use subnet::{Subnet, SubnetDiscovery};
pub use sync_state::{BackFillState, SyncState}; pub use sync_state::{BackFillState, SyncState};
pub use topics::{ pub use topics::{
attestation_sync_committee_topics, core_topics_to_subscribe, fork_core_topics, all_topics_at_fork, core_topics_to_subscribe, is_fork_non_core_topic, subnet_from_topic_hash,
subnet_from_topic_hash, GossipEncoding, GossipKind, GossipTopic, TopicConfig, GossipEncoding, GossipKind, GossipTopic, TopicConfig,
ALTAIR_CORE_TOPICS, BASE_CORE_TOPICS, CAPELLA_CORE_TOPICS, LIGHT_CLIENT_GOSSIP_TOPICS,
}; };

View File

@@ -25,104 +25,110 @@ pub const BLS_TO_EXECUTION_CHANGE_TOPIC: &str = "bls_to_execution_change";
pub const LIGHT_CLIENT_FINALITY_UPDATE: &str = "light_client_finality_update"; pub const LIGHT_CLIENT_FINALITY_UPDATE: &str = "light_client_finality_update";
pub const LIGHT_CLIENT_OPTIMISTIC_UPDATE: &str = "light_client_optimistic_update"; pub const LIGHT_CLIENT_OPTIMISTIC_UPDATE: &str = "light_client_optimistic_update";
pub const BASE_CORE_TOPICS: [GossipKind; 5] = [
GossipKind::BeaconBlock,
GossipKind::BeaconAggregateAndProof,
GossipKind::VoluntaryExit,
GossipKind::ProposerSlashing,
GossipKind::AttesterSlashing,
];
pub const ALTAIR_CORE_TOPICS: [GossipKind; 1] = [GossipKind::SignedContributionAndProof];
pub const CAPELLA_CORE_TOPICS: [GossipKind; 1] = [GossipKind::BlsToExecutionChange];
pub const LIGHT_CLIENT_GOSSIP_TOPICS: [GossipKind; 2] = [
GossipKind::LightClientFinalityUpdate,
GossipKind::LightClientOptimisticUpdate,
];
#[derive(Debug)] #[derive(Debug)]
pub struct TopicConfig<'a> { pub struct TopicConfig<'a> {
pub enable_light_client_server: bool,
pub subscribe_all_subnets: bool,
pub subscribe_all_data_column_subnets: bool, pub subscribe_all_data_column_subnets: bool,
pub sampling_subnets: &'a HashSet<DataColumnSubnetId>, pub sampling_subnets: &'a HashSet<DataColumnSubnetId>,
} }
/// Returns the core topics associated with each fork that are new to the previous fork /// Returns all the topics the node should subscribe at `fork_name`
pub fn fork_core_topics<E: EthSpec>(
fork_name: &ForkName,
spec: &ChainSpec,
topic_config: &TopicConfig,
) -> Vec<GossipKind> {
match fork_name {
ForkName::Base => BASE_CORE_TOPICS.to_vec(),
ForkName::Altair => ALTAIR_CORE_TOPICS.to_vec(),
ForkName::Bellatrix => vec![],
ForkName::Capella => CAPELLA_CORE_TOPICS.to_vec(),
ForkName::Deneb => {
// All of deneb blob topics are core topics
let mut deneb_blob_topics = Vec::new();
for i in 0..spec.blob_sidecar_subnet_count(ForkName::Deneb) {
deneb_blob_topics.push(GossipKind::BlobSidecar(i));
}
deneb_blob_topics
}
ForkName::Electra => {
// All of electra blob topics are core topics
let mut electra_blob_topics = Vec::new();
for i in 0..spec.blob_sidecar_subnet_count(ForkName::Electra) {
electra_blob_topics.push(GossipKind::BlobSidecar(i));
}
electra_blob_topics
}
ForkName::Fulu => {
let mut topics = vec![];
if topic_config.subscribe_all_data_column_subnets {
for column_subnet in 0..spec.data_column_sidecar_subnet_count {
topics.push(GossipKind::DataColumnSidecar(DataColumnSubnetId::new(
column_subnet,
)));
}
} else {
for column_subnet in topic_config.sampling_subnets {
topics.push(GossipKind::DataColumnSidecar(*column_subnet));
}
}
topics
}
}
}
/// Returns all the attestation and sync committee topics, for a given fork.
pub fn attestation_sync_committee_topics<E: EthSpec>() -> impl Iterator<Item = GossipKind> {
(0..E::SubnetBitfieldLength::to_usize())
.map(|subnet_id| GossipKind::Attestation(SubnetId::new(subnet_id as u64)))
.chain(
(0..E::SyncCommitteeSubnetCount::to_usize()).map(|sync_committee_id| {
GossipKind::SyncCommitteeMessage(SyncSubnetId::new(sync_committee_id as u64))
}),
)
}
/// Returns all the topics that we need to subscribe to for a given fork
/// including topics from older forks and new topics for the current fork.
pub fn core_topics_to_subscribe<E: EthSpec>( pub fn core_topics_to_subscribe<E: EthSpec>(
mut current_fork: ForkName, fork_name: ForkName,
opts: &TopicConfig,
spec: &ChainSpec, spec: &ChainSpec,
topic_config: &TopicConfig,
) -> Vec<GossipKind> { ) -> Vec<GossipKind> {
let mut topics = fork_core_topics::<E>(&current_fork, spec, topic_config); let mut topics = vec![
while let Some(previous_fork) = current_fork.previous_fork() { GossipKind::BeaconBlock,
let previous_fork_topics = fork_core_topics::<E>(&previous_fork, spec, topic_config); GossipKind::BeaconAggregateAndProof,
topics.extend(previous_fork_topics); GossipKind::VoluntaryExit,
current_fork = previous_fork; GossipKind::ProposerSlashing,
GossipKind::AttesterSlashing,
];
if opts.subscribe_all_subnets {
for i in 0..spec.attestation_subnet_count {
topics.push(GossipKind::Attestation(i.into()));
}
} }
// Remove duplicates
if fork_name.altair_enabled() {
topics.push(GossipKind::SignedContributionAndProof);
if opts.subscribe_all_subnets {
for i in 0..E::SyncCommitteeSubnetCount::to_u64() {
topics.push(GossipKind::SyncCommitteeMessage(i.into()));
}
}
if opts.enable_light_client_server {
topics.push(GossipKind::LightClientFinalityUpdate);
topics.push(GossipKind::LightClientOptimisticUpdate);
}
}
if fork_name.capella_enabled() {
topics.push(GossipKind::BlsToExecutionChange);
}
if fork_name.deneb_enabled() && !fork_name.fulu_enabled() {
// All of deneb blob topics are core topics
for i in 0..spec.blob_sidecar_subnet_count(fork_name) {
topics.push(GossipKind::BlobSidecar(i));
}
}
if fork_name.fulu_enabled() {
if opts.subscribe_all_data_column_subnets {
for i in 0..spec.data_column_sidecar_subnet_count {
topics.push(GossipKind::DataColumnSidecar(i.into()));
}
} else {
for subnet in opts.sampling_subnets {
topics.push(GossipKind::DataColumnSidecar(*subnet));
}
}
}
topics topics
.into_iter() }
.collect::<std::collections::HashSet<_>>()
.into_iter() /// Returns true if a given non-core `GossipTopic` MAY be subscribe at this fork.
.collect() ///
/// For example: the `Attestation` topic is not subscribed as a core topic if
/// subscribe_all_subnets = false` but we may subscribe to it outside of a fork
/// boundary if the node is an aggregator.
pub fn is_fork_non_core_topic(topic: &GossipTopic, _fork_name: ForkName) -> bool {
match topic.kind() {
// Node may be aggregator of attestation and sync_committee_message topics for all known
// forks
GossipKind::Attestation(_) | GossipKind::SyncCommitteeMessage(_) => true,
// All these topics are core-only
GossipKind::BeaconBlock
| GossipKind::BeaconAggregateAndProof
| GossipKind::BlobSidecar(_)
| GossipKind::DataColumnSidecar(_)
| GossipKind::VoluntaryExit
| GossipKind::ProposerSlashing
| GossipKind::AttesterSlashing
| GossipKind::SignedContributionAndProof
| GossipKind::BlsToExecutionChange
| GossipKind::LightClientFinalityUpdate
| GossipKind::LightClientOptimisticUpdate => false,
}
}
pub fn all_topics_at_fork<E: EthSpec>(fork: ForkName, spec: &ChainSpec) -> Vec<GossipKind> {
// Compute the worst case of all forks
let sampling_subnets = HashSet::from_iter(spec.all_data_column_sidecar_subnets());
let opts = TopicConfig {
enable_light_client_server: true,
subscribe_all_subnets: true,
subscribe_all_data_column_subnets: true,
sampling_subnets: &sampling_subnets,
};
core_topics_to_subscribe::<E>(fork, &opts, spec)
} }
/// A gossipsub topic which encapsulates the type of messages that should be sent and received over /// A gossipsub topic which encapsulates the type of messages that should be sent and received over
@@ -368,10 +374,9 @@ fn subnet_topic_index(topic: &str) -> Option<GossipKind> {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use types::MainnetEthSpec;
use super::GossipKind::*; use super::GossipKind::*;
use super::*; use super::*;
use types::{Epoch, MainnetEthSpec as E};
const GOOD_FORK_DIGEST: &str = "e1925f3b"; const GOOD_FORK_DIGEST: &str = "e1925f3b";
const BAD_PREFIX: &str = "tezos"; const BAD_PREFIX: &str = "tezos";
@@ -496,31 +501,94 @@ mod tests {
assert_eq!("attester_slashing", AttesterSlashing.as_ref()); assert_eq!("attester_slashing", AttesterSlashing.as_ref());
} }
fn get_spec() -> ChainSpec {
let mut spec = E::default_spec();
spec.altair_fork_epoch = Some(Epoch::new(1));
spec.bellatrix_fork_epoch = Some(Epoch::new(2));
spec.capella_fork_epoch = Some(Epoch::new(3));
spec.deneb_fork_epoch = Some(Epoch::new(4));
spec.electra_fork_epoch = Some(Epoch::new(5));
spec.fulu_fork_epoch = Some(Epoch::new(6));
spec
}
fn get_sampling_subnets() -> HashSet<DataColumnSubnetId> {
HashSet::new()
}
fn get_topic_config(sampling_subnets: &HashSet<DataColumnSubnetId>) -> TopicConfig {
TopicConfig {
enable_light_client_server: false,
subscribe_all_subnets: false,
subscribe_all_data_column_subnets: false,
sampling_subnets,
}
}
#[test]
fn base_topics_are_always_active() {
let spec = get_spec();
let s = get_sampling_subnets();
let topic_config = get_topic_config(&s);
for fork in ForkName::list_all() {
assert!(core_topics_to_subscribe::<E>(fork, &topic_config, &spec,)
.contains(&GossipKind::BeaconBlock));
}
}
#[test]
fn blobs_are_not_subscribed_in_peerdas() {
let spec = get_spec();
let s = get_sampling_subnets();
let topic_config = get_topic_config(&s);
assert!(
!core_topics_to_subscribe::<E>(ForkName::Fulu, &topic_config, &spec,)
.contains(&GossipKind::BlobSidecar(0))
);
}
#[test]
fn columns_are_subscribed_in_peerdas() {
let spec = get_spec();
let s = get_sampling_subnets();
let mut topic_config = get_topic_config(&s);
topic_config.subscribe_all_data_column_subnets = true;
assert!(
core_topics_to_subscribe::<E>(ForkName::Fulu, &topic_config, &spec)
.contains(&GossipKind::DataColumnSidecar(0.into()))
);
}
#[test] #[test]
fn test_core_topics_to_subscribe() { fn test_core_topics_to_subscribe() {
type E = MainnetEthSpec; let spec = get_spec();
let spec = E::default_spec(); let s = HashSet::from_iter([1, 2].map(DataColumnSubnetId::new));
let mut all_topics = Vec::new(); let mut topic_config = get_topic_config(&s);
let topic_config = TopicConfig { topic_config.enable_light_client_server = true;
subscribe_all_data_column_subnets: false,
sampling_subnets: &HashSet::from_iter([1, 2].map(DataColumnSubnetId::new)),
};
let mut fulu_core_topics = fork_core_topics::<E>(&ForkName::Fulu, &spec, &topic_config);
let mut electra_core_topics =
fork_core_topics::<E>(&ForkName::Electra, &spec, &topic_config);
let mut deneb_core_topics = fork_core_topics::<E>(&ForkName::Deneb, &spec, &topic_config);
all_topics.append(&mut fulu_core_topics);
all_topics.append(&mut electra_core_topics);
all_topics.append(&mut deneb_core_topics);
all_topics.extend(CAPELLA_CORE_TOPICS);
all_topics.extend(ALTAIR_CORE_TOPICS);
all_topics.extend(BASE_CORE_TOPICS);
let latest_fork = *ForkName::list_all().last().unwrap(); let latest_fork = *ForkName::list_all().last().unwrap();
let core_topics = core_topics_to_subscribe::<E>(latest_fork, &spec, &topic_config); let topics = core_topics_to_subscribe::<E>(latest_fork, &topic_config, &spec);
let mut expected_topics = vec![
GossipKind::BeaconBlock,
GossipKind::BeaconAggregateAndProof,
GossipKind::VoluntaryExit,
GossipKind::ProposerSlashing,
GossipKind::AttesterSlashing,
GossipKind::SignedContributionAndProof,
GossipKind::LightClientFinalityUpdate,
GossipKind::LightClientOptimisticUpdate,
GossipKind::BlsToExecutionChange,
];
for subnet in s {
expected_topics.push(GossipKind::DataColumnSidecar(subnet));
}
// Need to check all the topics exist in an order independent manner // Need to check all the topics exist in an order independent manner
for topic in all_topics { for expected_topic in expected_topics {
assert!(core_topics.contains(&topic)); assert!(
topics.contains(&expected_topic),
"Should contain {:?}",
expected_topic
);
} }
} }
} }

View File

@@ -181,8 +181,6 @@ pub struct NetworkService<T: BeaconChainTypes> {
next_fork_subscriptions: Pin<Box<OptionFuture<Sleep>>>, next_fork_subscriptions: Pin<Box<OptionFuture<Sleep>>>,
/// A delay that expires when we need to unsubscribe from old fork topics. /// A delay that expires when we need to unsubscribe from old fork topics.
next_unsubscribe: Pin<Box<OptionFuture<Sleep>>>, next_unsubscribe: Pin<Box<OptionFuture<Sleep>>>,
/// Subscribe to all the subnets once synced.
subscribe_all_subnets: bool,
/// Shutdown beacon node after sync is complete. /// Shutdown beacon node after sync is complete.
shutdown_after_sync: bool, shutdown_after_sync: bool,
/// Whether metrics are enabled or not. /// Whether metrics are enabled or not.
@@ -191,8 +189,6 @@ pub struct NetworkService<T: BeaconChainTypes> {
metrics_update: tokio::time::Interval, metrics_update: tokio::time::Interval,
/// gossipsub_parameter_update timer /// gossipsub_parameter_update timer
gossipsub_parameter_update: tokio::time::Interval, gossipsub_parameter_update: tokio::time::Interval,
/// enable_light_client_server indicator
enable_light_client_server: bool,
/// The logger for the network service. /// The logger for the network service.
fork_context: Arc<ForkContext>, fork_context: Arc<ForkContext>,
log: slog::Logger, log: slog::Logger,
@@ -347,14 +343,12 @@ impl<T: BeaconChainTypes> NetworkService<T> {
next_fork_update, next_fork_update,
next_fork_subscriptions, next_fork_subscriptions,
next_unsubscribe, next_unsubscribe,
subscribe_all_subnets: config.subscribe_all_subnets,
shutdown_after_sync: config.shutdown_after_sync, shutdown_after_sync: config.shutdown_after_sync,
metrics_enabled: config.metrics_enabled, metrics_enabled: config.metrics_enabled,
metrics_update, metrics_update,
gossipsub_parameter_update, gossipsub_parameter_update,
fork_context, fork_context,
log: network_log, log: network_log,
enable_light_client_server: config.enable_light_client_server,
}; };
Ok((network_service, network_globals, network_senders)) Ok((network_service, network_globals, network_senders))
@@ -713,8 +707,8 @@ impl<T: BeaconChainTypes> NetworkService<T> {
let mut subscribed_topics: Vec<GossipTopic> = vec![]; let mut subscribed_topics: Vec<GossipTopic> = vec![];
for topic_kind in core_topics_to_subscribe::<T::EthSpec>( for topic_kind in core_topics_to_subscribe::<T::EthSpec>(
self.fork_context.current_fork(), self.fork_context.current_fork(),
&self.fork_context.spec,
&self.network_globals.as_topic_config(), &self.network_globals.as_topic_config(),
&self.fork_context.spec,
) { ) {
for fork_digest in self.required_gossip_fork_digests() { for fork_digest in self.required_gossip_fork_digests() {
let topic = GossipTopic::new( let topic = GossipTopic::new(
@@ -730,57 +724,18 @@ impl<T: BeaconChainTypes> NetworkService<T> {
} }
} }
if self.enable_light_client_server {
for light_client_topic_kind in
lighthouse_network::types::LIGHT_CLIENT_GOSSIP_TOPICS.iter()
{
for fork_digest in self.required_gossip_fork_digests() {
let light_client_topic = GossipTopic::new(
light_client_topic_kind.clone(),
GossipEncoding::default(),
fork_digest,
);
if self.libp2p.subscribe(light_client_topic.clone()) {
subscribed_topics.push(light_client_topic);
} else {
warn!(self.log, "Could not subscribe to topic"; "topic" => %light_client_topic);
}
}
}
}
// If we are to subscribe to all subnets we do it here // If we are to subscribe to all subnets we do it here
if self.subscribe_all_subnets { if self.network_globals.config.subscribe_all_subnets {
for subnet_id in 0..<<T as BeaconChainTypes>::EthSpec as EthSpec>::SubnetBitfieldLength::to_u64() { for subnet_id in 0..<<T as BeaconChainTypes>::EthSpec as EthSpec>::SubnetBitfieldLength::to_u64() {
let subnet = Subnet::Attestation(SubnetId::new(subnet_id)); let subnet = Subnet::Attestation(SubnetId::new(subnet_id));
// Update the ENR bitfield // Update the ENR bitfield
self.libp2p.update_enr_subnet(subnet, true); self.libp2p.update_enr_subnet(subnet, true);
for fork_digest in self.required_gossip_fork_digests() {
let topic = GossipTopic::new(subnet.into(), GossipEncoding::default(), fork_digest);
if self.libp2p.subscribe(topic.clone()) {
subscribed_topics.push(topic);
} else {
warn!(self.log, "Could not subscribe to topic"; "topic" => %topic);
}
}
} }
let subnet_max = <<T as BeaconChainTypes>::EthSpec as EthSpec>::SyncCommitteeSubnetCount::to_u64(); let subnet_max = <<T as BeaconChainTypes>::EthSpec as EthSpec>::SyncCommitteeSubnetCount::to_u64();
for subnet_id in 0..subnet_max { for subnet_id in 0..subnet_max {
let subnet = Subnet::SyncCommittee(SyncSubnetId::new(subnet_id)); let subnet = Subnet::SyncCommittee(SyncSubnetId::new(subnet_id));
// Update the ENR bitfield // Update the ENR bitfield
self.libp2p.update_enr_subnet(subnet, true); self.libp2p.update_enr_subnet(subnet, true);
for fork_digest in self.required_gossip_fork_digests() {
let topic = GossipTopic::new(
subnet.into(),
GossipEncoding::default(),
fork_digest,
);
if self.libp2p.subscribe(topic.clone()) {
subscribed_topics.push(topic);
} else {
warn!(self.log, "Could not subscribe to topic"; "topic" => %topic);
}
}
} }
} }
@@ -909,8 +864,8 @@ impl<T: BeaconChainTypes> NetworkService<T> {
fn subscribed_core_topics(&self) -> bool { fn subscribed_core_topics(&self) -> bool {
let core_topics = core_topics_to_subscribe::<T::EthSpec>( let core_topics = core_topics_to_subscribe::<T::EthSpec>(
self.fork_context.current_fork(), self.fork_context.current_fork(),
&self.fork_context.spec,
&self.network_globals.as_topic_config(), &self.network_globals.as_topic_config(),
&self.fork_context.spec,
); );
let core_topics: HashSet<&GossipKind> = HashSet::from_iter(&core_topics); let core_topics: HashSet<&GossipKind> = HashSet::from_iter(&core_topics);
let subscriptions = self.network_globals.gossipsub_subscriptions.read(); let subscriptions = self.network_globals.gossipsub_subscriptions.read();

View File

@@ -712,6 +712,10 @@ impl ChainSpec {
} }
} }
pub fn all_data_column_sidecar_subnets(&self) -> impl Iterator<Item = DataColumnSubnetId> {
(0..self.data_column_sidecar_subnet_count).map(DataColumnSubnetId::new)
}
/// Returns a `ChainSpec` compatible with the Ethereum Foundation specification. /// Returns a `ChainSpec` compatible with the Ethereum Foundation specification.
pub fn mainnet() -> Self { pub fn mainnet() -> Self {
Self { Self {