mirror of
https://github.com/sigp/lighthouse.git
synced 2026-03-02 16:21:42 +00:00
Unsubscribe blob topics at Fulu fork (#6932)
Addresses #6854. PeerDAS requires unsubscribing a Gossip topic at a fork boundary. This is not possible with our current topic machinery. Instead of defining which topics have to be **added** at a given fork, we define the complete set of topics at a given fork. The new start of the show and key function is: ```rust pub fn core_topics_to_subscribe<E: EthSpec>( fork_name: ForkName, opts: &TopicConfig, spec: &ChainSpec, ) -> Vec<GossipKind> { // ... if fork_name.deneb_enabled() && !fork_name.fulu_enabled() { // All of deneb blob topics are core topics for i in 0..spec.blob_sidecar_subnet_count(fork_name) { topics.push(GossipKind::BlobSidecar(i)); } } // ... } ``` `core_topics_to_subscribe` only returns the blob topics if `fork < Fulu`. Then at the fork boundary, we subscribe with the new fork digest to `core_topics_to_subscribe(next_fork)`, which excludes the blob topics. I added `is_fork_non_core_topic` to carry on to the next fork the aggregator topics for attestations and sync committee messages. This approach is future-proof if those topics ever become fork-dependent. Closes https://github.com/sigp/lighthouse/issues/6854
This commit is contained in:
@@ -14,9 +14,8 @@ use crate::rpc::{
|
||||
RequestType, ResponseTermination, RpcErrorResponse, RpcResponse, RpcSuccessResponse, RPC,
|
||||
};
|
||||
use crate::types::{
|
||||
attestation_sync_committee_topics, fork_core_topics, subnet_from_topic_hash, GossipEncoding,
|
||||
GossipKind, GossipTopic, SnappyTransform, Subnet, SubnetDiscovery, ALTAIR_CORE_TOPICS,
|
||||
BASE_CORE_TOPICS, CAPELLA_CORE_TOPICS, LIGHT_CLIENT_GOSSIP_TOPICS,
|
||||
all_topics_at_fork, core_topics_to_subscribe, is_fork_non_core_topic, subnet_from_topic_hash,
|
||||
GossipEncoding, GossipKind, GossipTopic, SnappyTransform, Subnet, SubnetDiscovery,
|
||||
};
|
||||
use crate::EnrExt;
|
||||
use crate::Eth2Enr;
|
||||
@@ -280,14 +279,39 @@ impl<E: EthSpec> Network<E> {
|
||||
// Set up a scoring update interval
|
||||
let update_gossipsub_scores = tokio::time::interval(params.decay_interval);
|
||||
|
||||
let max_topics = ctx.chain_spec.attestation_subnet_count as usize
|
||||
+ SYNC_COMMITTEE_SUBNET_COUNT as usize
|
||||
+ ctx.chain_spec.blob_sidecar_subnet_count_max() as usize
|
||||
+ ctx.chain_spec.data_column_sidecar_subnet_count as usize
|
||||
+ BASE_CORE_TOPICS.len()
|
||||
+ ALTAIR_CORE_TOPICS.len()
|
||||
+ CAPELLA_CORE_TOPICS.len() // 0 core deneb and electra topics
|
||||
+ LIGHT_CLIENT_GOSSIP_TOPICS.len();
|
||||
let current_and_future_forks = ForkName::list_all().into_iter().filter_map(|fork| {
|
||||
if fork >= ctx.fork_context.current_fork() {
|
||||
ctx.fork_context
|
||||
.to_context_bytes(fork)
|
||||
.map(|fork_digest| (fork, fork_digest))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
});
|
||||
|
||||
let all_topics_for_forks = current_and_future_forks
|
||||
.map(|(fork, fork_digest)| {
|
||||
all_topics_at_fork::<E>(fork, &ctx.chain_spec)
|
||||
.into_iter()
|
||||
.map(|topic| {
|
||||
Topic::new(GossipTopic::new(
|
||||
topic,
|
||||
GossipEncoding::default(),
|
||||
fork_digest,
|
||||
))
|
||||
.into()
|
||||
})
|
||||
.collect::<Vec<TopicHash>>()
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
// For simplicity find the fork with the most individual topics and assume all forks
|
||||
// have the same topic count
|
||||
let max_topics_at_any_fork = all_topics_for_forks
|
||||
.iter()
|
||||
.map(|topics| topics.len())
|
||||
.max()
|
||||
.expect("each fork has at least 5 hardcoded core topics");
|
||||
|
||||
let possible_fork_digests = ctx.fork_context.all_fork_digests();
|
||||
let filter = gossipsub::MaxCountSubscriptionFilter {
|
||||
@@ -297,9 +321,9 @@ impl<E: EthSpec> Network<E> {
|
||||
SYNC_COMMITTEE_SUBNET_COUNT,
|
||||
),
|
||||
// during a fork we subscribe to both the old and new topics
|
||||
max_subscribed_topics: max_topics * 4,
|
||||
max_subscribed_topics: max_topics_at_any_fork * 4,
|
||||
// 424 in theory = (64 attestation + 4 sync committee + 7 core topics + 9 blob topics + 128 column topics) * 2
|
||||
max_subscriptions_per_request: max_topics * 2,
|
||||
max_subscriptions_per_request: max_topics_at_any_fork * 2,
|
||||
};
|
||||
|
||||
// If metrics are enabled for libp2p build the configuration
|
||||
@@ -332,17 +356,9 @@ impl<E: EthSpec> Network<E> {
|
||||
// If we are using metrics, then register which topics we want to make sure to keep
|
||||
// track of
|
||||
if ctx.libp2p_registry.is_some() {
|
||||
let topics_to_keep_metrics_for = attestation_sync_committee_topics::<E>()
|
||||
.map(|gossip_kind| {
|
||||
Topic::from(GossipTopic::new(
|
||||
gossip_kind,
|
||||
GossipEncoding::default(),
|
||||
enr_fork_id.fork_digest,
|
||||
))
|
||||
.into()
|
||||
})
|
||||
.collect::<Vec<TopicHash>>();
|
||||
gossipsub.register_topics_for_metrics(topics_to_keep_metrics_for);
|
||||
for topics in all_topics_for_forks {
|
||||
gossipsub.register_topics_for_metrics(topics);
|
||||
}
|
||||
}
|
||||
|
||||
(gossipsub, update_gossipsub_scores)
|
||||
@@ -700,38 +716,26 @@ impl<E: EthSpec> Network<E> {
|
||||
|
||||
/// Subscribe to all required topics for the `new_fork` with the given `new_fork_digest`.
|
||||
pub fn subscribe_new_fork_topics(&mut self, new_fork: ForkName, new_fork_digest: [u8; 4]) {
|
||||
// Subscribe to existing topics with new fork digest
|
||||
// Re-subscribe to non-core topics with the new fork digest
|
||||
let subscriptions = self.network_globals.gossipsub_subscriptions.read().clone();
|
||||
for mut topic in subscriptions.into_iter() {
|
||||
topic.fork_digest = new_fork_digest;
|
||||
self.subscribe(topic);
|
||||
if is_fork_non_core_topic(&topic, new_fork) {
|
||||
topic.fork_digest = new_fork_digest;
|
||||
self.subscribe(topic);
|
||||
}
|
||||
}
|
||||
|
||||
// Subscribe to core topics for the new fork
|
||||
for kind in fork_core_topics::<E>(
|
||||
&new_fork,
|
||||
&self.fork_context.spec,
|
||||
for kind in core_topics_to_subscribe::<E>(
|
||||
new_fork,
|
||||
&self.network_globals.as_topic_config(),
|
||||
&self.fork_context.spec,
|
||||
) {
|
||||
let topic = GossipTopic::new(kind, GossipEncoding::default(), new_fork_digest);
|
||||
self.subscribe(topic);
|
||||
}
|
||||
|
||||
// TODO(das): unsubscribe from blob topics at the Fulu fork
|
||||
|
||||
// Register the new topics for metrics
|
||||
let topics_to_keep_metrics_for = attestation_sync_committee_topics::<E>()
|
||||
.map(|gossip_kind| {
|
||||
Topic::from(GossipTopic::new(
|
||||
gossip_kind,
|
||||
GossipEncoding::default(),
|
||||
new_fork_digest,
|
||||
))
|
||||
.into()
|
||||
})
|
||||
.collect::<Vec<TopicHash>>();
|
||||
self.gossipsub_mut()
|
||||
.register_topics_for_metrics(topics_to_keep_metrics_for);
|
||||
// Already registered all possible gossipsub topics for metrics
|
||||
}
|
||||
|
||||
/// Unsubscribe from all topics that doesn't have the given fork_digest
|
||||
|
||||
@@ -187,6 +187,8 @@ impl<E: EthSpec> NetworkGlobals<E> {
|
||||
/// Returns the TopicConfig to compute the set of Gossip topics for a given fork
|
||||
pub fn as_topic_config(&self) -> TopicConfig {
|
||||
TopicConfig {
|
||||
enable_light_client_server: self.config.enable_light_client_server,
|
||||
subscribe_all_subnets: self.config.subscribe_all_subnets,
|
||||
subscribe_all_data_column_subnets: self.config.subscribe_all_data_column_subnets,
|
||||
sampling_subnets: &self.sampling_subnets,
|
||||
}
|
||||
|
||||
@@ -16,7 +16,6 @@ pub use pubsub::{PubsubMessage, SnappyTransform};
|
||||
pub use subnet::{Subnet, SubnetDiscovery};
|
||||
pub use sync_state::{BackFillState, SyncState};
|
||||
pub use topics::{
|
||||
attestation_sync_committee_topics, core_topics_to_subscribe, fork_core_topics,
|
||||
subnet_from_topic_hash, GossipEncoding, GossipKind, GossipTopic, TopicConfig,
|
||||
ALTAIR_CORE_TOPICS, BASE_CORE_TOPICS, CAPELLA_CORE_TOPICS, LIGHT_CLIENT_GOSSIP_TOPICS,
|
||||
all_topics_at_fork, core_topics_to_subscribe, is_fork_non_core_topic, subnet_from_topic_hash,
|
||||
GossipEncoding, GossipKind, GossipTopic, TopicConfig,
|
||||
};
|
||||
|
||||
@@ -25,104 +25,110 @@ pub const BLS_TO_EXECUTION_CHANGE_TOPIC: &str = "bls_to_execution_change";
|
||||
pub const LIGHT_CLIENT_FINALITY_UPDATE: &str = "light_client_finality_update";
|
||||
pub const LIGHT_CLIENT_OPTIMISTIC_UPDATE: &str = "light_client_optimistic_update";
|
||||
|
||||
pub const BASE_CORE_TOPICS: [GossipKind; 5] = [
|
||||
GossipKind::BeaconBlock,
|
||||
GossipKind::BeaconAggregateAndProof,
|
||||
GossipKind::VoluntaryExit,
|
||||
GossipKind::ProposerSlashing,
|
||||
GossipKind::AttesterSlashing,
|
||||
];
|
||||
|
||||
pub const ALTAIR_CORE_TOPICS: [GossipKind; 1] = [GossipKind::SignedContributionAndProof];
|
||||
|
||||
pub const CAPELLA_CORE_TOPICS: [GossipKind; 1] = [GossipKind::BlsToExecutionChange];
|
||||
|
||||
pub const LIGHT_CLIENT_GOSSIP_TOPICS: [GossipKind; 2] = [
|
||||
GossipKind::LightClientFinalityUpdate,
|
||||
GossipKind::LightClientOptimisticUpdate,
|
||||
];
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct TopicConfig<'a> {
|
||||
pub enable_light_client_server: bool,
|
||||
pub subscribe_all_subnets: bool,
|
||||
pub subscribe_all_data_column_subnets: bool,
|
||||
pub sampling_subnets: &'a HashSet<DataColumnSubnetId>,
|
||||
}
|
||||
|
||||
/// Returns the core topics associated with each fork that are new to the previous fork
|
||||
pub fn fork_core_topics<E: EthSpec>(
|
||||
fork_name: &ForkName,
|
||||
spec: &ChainSpec,
|
||||
topic_config: &TopicConfig,
|
||||
) -> Vec<GossipKind> {
|
||||
match fork_name {
|
||||
ForkName::Base => BASE_CORE_TOPICS.to_vec(),
|
||||
ForkName::Altair => ALTAIR_CORE_TOPICS.to_vec(),
|
||||
ForkName::Bellatrix => vec![],
|
||||
ForkName::Capella => CAPELLA_CORE_TOPICS.to_vec(),
|
||||
ForkName::Deneb => {
|
||||
// All of deneb blob topics are core topics
|
||||
let mut deneb_blob_topics = Vec::new();
|
||||
for i in 0..spec.blob_sidecar_subnet_count(ForkName::Deneb) {
|
||||
deneb_blob_topics.push(GossipKind::BlobSidecar(i));
|
||||
}
|
||||
deneb_blob_topics
|
||||
}
|
||||
ForkName::Electra => {
|
||||
// All of electra blob topics are core topics
|
||||
let mut electra_blob_topics = Vec::new();
|
||||
for i in 0..spec.blob_sidecar_subnet_count(ForkName::Electra) {
|
||||
electra_blob_topics.push(GossipKind::BlobSidecar(i));
|
||||
}
|
||||
electra_blob_topics
|
||||
}
|
||||
ForkName::Fulu => {
|
||||
let mut topics = vec![];
|
||||
if topic_config.subscribe_all_data_column_subnets {
|
||||
for column_subnet in 0..spec.data_column_sidecar_subnet_count {
|
||||
topics.push(GossipKind::DataColumnSidecar(DataColumnSubnetId::new(
|
||||
column_subnet,
|
||||
)));
|
||||
}
|
||||
} else {
|
||||
for column_subnet in topic_config.sampling_subnets {
|
||||
topics.push(GossipKind::DataColumnSidecar(*column_subnet));
|
||||
}
|
||||
}
|
||||
topics
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns all the attestation and sync committee topics, for a given fork.
|
||||
pub fn attestation_sync_committee_topics<E: EthSpec>() -> impl Iterator<Item = GossipKind> {
|
||||
(0..E::SubnetBitfieldLength::to_usize())
|
||||
.map(|subnet_id| GossipKind::Attestation(SubnetId::new(subnet_id as u64)))
|
||||
.chain(
|
||||
(0..E::SyncCommitteeSubnetCount::to_usize()).map(|sync_committee_id| {
|
||||
GossipKind::SyncCommitteeMessage(SyncSubnetId::new(sync_committee_id as u64))
|
||||
}),
|
||||
)
|
||||
}
|
||||
|
||||
/// Returns all the topics that we need to subscribe to for a given fork
|
||||
/// including topics from older forks and new topics for the current fork.
|
||||
/// Returns all the topics the node should subscribe at `fork_name`
|
||||
pub fn core_topics_to_subscribe<E: EthSpec>(
|
||||
mut current_fork: ForkName,
|
||||
fork_name: ForkName,
|
||||
opts: &TopicConfig,
|
||||
spec: &ChainSpec,
|
||||
topic_config: &TopicConfig,
|
||||
) -> Vec<GossipKind> {
|
||||
let mut topics = fork_core_topics::<E>(¤t_fork, spec, topic_config);
|
||||
while let Some(previous_fork) = current_fork.previous_fork() {
|
||||
let previous_fork_topics = fork_core_topics::<E>(&previous_fork, spec, topic_config);
|
||||
topics.extend(previous_fork_topics);
|
||||
current_fork = previous_fork;
|
||||
let mut topics = vec![
|
||||
GossipKind::BeaconBlock,
|
||||
GossipKind::BeaconAggregateAndProof,
|
||||
GossipKind::VoluntaryExit,
|
||||
GossipKind::ProposerSlashing,
|
||||
GossipKind::AttesterSlashing,
|
||||
];
|
||||
|
||||
if opts.subscribe_all_subnets {
|
||||
for i in 0..spec.attestation_subnet_count {
|
||||
topics.push(GossipKind::Attestation(i.into()));
|
||||
}
|
||||
}
|
||||
// Remove duplicates
|
||||
|
||||
if fork_name.altair_enabled() {
|
||||
topics.push(GossipKind::SignedContributionAndProof);
|
||||
|
||||
if opts.subscribe_all_subnets {
|
||||
for i in 0..E::SyncCommitteeSubnetCount::to_u64() {
|
||||
topics.push(GossipKind::SyncCommitteeMessage(i.into()));
|
||||
}
|
||||
}
|
||||
|
||||
if opts.enable_light_client_server {
|
||||
topics.push(GossipKind::LightClientFinalityUpdate);
|
||||
topics.push(GossipKind::LightClientOptimisticUpdate);
|
||||
}
|
||||
}
|
||||
|
||||
if fork_name.capella_enabled() {
|
||||
topics.push(GossipKind::BlsToExecutionChange);
|
||||
}
|
||||
|
||||
if fork_name.deneb_enabled() && !fork_name.fulu_enabled() {
|
||||
// All of deneb blob topics are core topics
|
||||
for i in 0..spec.blob_sidecar_subnet_count(fork_name) {
|
||||
topics.push(GossipKind::BlobSidecar(i));
|
||||
}
|
||||
}
|
||||
|
||||
if fork_name.fulu_enabled() {
|
||||
if opts.subscribe_all_data_column_subnets {
|
||||
for i in 0..spec.data_column_sidecar_subnet_count {
|
||||
topics.push(GossipKind::DataColumnSidecar(i.into()));
|
||||
}
|
||||
} else {
|
||||
for subnet in opts.sampling_subnets {
|
||||
topics.push(GossipKind::DataColumnSidecar(*subnet));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
topics
|
||||
.into_iter()
|
||||
.collect::<std::collections::HashSet<_>>()
|
||||
.into_iter()
|
||||
.collect()
|
||||
}
|
||||
|
||||
/// Returns true if a given non-core `GossipTopic` MAY be subscribe at this fork.
|
||||
///
|
||||
/// For example: the `Attestation` topic is not subscribed as a core topic if
|
||||
/// subscribe_all_subnets = false` but we may subscribe to it outside of a fork
|
||||
/// boundary if the node is an aggregator.
|
||||
pub fn is_fork_non_core_topic(topic: &GossipTopic, _fork_name: ForkName) -> bool {
|
||||
match topic.kind() {
|
||||
// Node may be aggregator of attestation and sync_committee_message topics for all known
|
||||
// forks
|
||||
GossipKind::Attestation(_) | GossipKind::SyncCommitteeMessage(_) => true,
|
||||
// All these topics are core-only
|
||||
GossipKind::BeaconBlock
|
||||
| GossipKind::BeaconAggregateAndProof
|
||||
| GossipKind::BlobSidecar(_)
|
||||
| GossipKind::DataColumnSidecar(_)
|
||||
| GossipKind::VoluntaryExit
|
||||
| GossipKind::ProposerSlashing
|
||||
| GossipKind::AttesterSlashing
|
||||
| GossipKind::SignedContributionAndProof
|
||||
| GossipKind::BlsToExecutionChange
|
||||
| GossipKind::LightClientFinalityUpdate
|
||||
| GossipKind::LightClientOptimisticUpdate => false,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn all_topics_at_fork<E: EthSpec>(fork: ForkName, spec: &ChainSpec) -> Vec<GossipKind> {
|
||||
// Compute the worst case of all forks
|
||||
let sampling_subnets = HashSet::from_iter(spec.all_data_column_sidecar_subnets());
|
||||
let opts = TopicConfig {
|
||||
enable_light_client_server: true,
|
||||
subscribe_all_subnets: true,
|
||||
subscribe_all_data_column_subnets: true,
|
||||
sampling_subnets: &sampling_subnets,
|
||||
};
|
||||
core_topics_to_subscribe::<E>(fork, &opts, spec)
|
||||
}
|
||||
|
||||
/// A gossipsub topic which encapsulates the type of messages that should be sent and received over
|
||||
@@ -368,10 +374,9 @@ fn subnet_topic_index(topic: &str) -> Option<GossipKind> {
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use types::MainnetEthSpec;
|
||||
|
||||
use super::GossipKind::*;
|
||||
use super::*;
|
||||
use types::{Epoch, MainnetEthSpec as E};
|
||||
|
||||
const GOOD_FORK_DIGEST: &str = "e1925f3b";
|
||||
const BAD_PREFIX: &str = "tezos";
|
||||
@@ -496,31 +501,94 @@ mod tests {
|
||||
assert_eq!("attester_slashing", AttesterSlashing.as_ref());
|
||||
}
|
||||
|
||||
fn get_spec() -> ChainSpec {
|
||||
let mut spec = E::default_spec();
|
||||
spec.altair_fork_epoch = Some(Epoch::new(1));
|
||||
spec.bellatrix_fork_epoch = Some(Epoch::new(2));
|
||||
spec.capella_fork_epoch = Some(Epoch::new(3));
|
||||
spec.deneb_fork_epoch = Some(Epoch::new(4));
|
||||
spec.electra_fork_epoch = Some(Epoch::new(5));
|
||||
spec.fulu_fork_epoch = Some(Epoch::new(6));
|
||||
spec
|
||||
}
|
||||
|
||||
fn get_sampling_subnets() -> HashSet<DataColumnSubnetId> {
|
||||
HashSet::new()
|
||||
}
|
||||
|
||||
fn get_topic_config(sampling_subnets: &HashSet<DataColumnSubnetId>) -> TopicConfig {
|
||||
TopicConfig {
|
||||
enable_light_client_server: false,
|
||||
subscribe_all_subnets: false,
|
||||
subscribe_all_data_column_subnets: false,
|
||||
sampling_subnets,
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn base_topics_are_always_active() {
|
||||
let spec = get_spec();
|
||||
let s = get_sampling_subnets();
|
||||
let topic_config = get_topic_config(&s);
|
||||
for fork in ForkName::list_all() {
|
||||
assert!(core_topics_to_subscribe::<E>(fork, &topic_config, &spec,)
|
||||
.contains(&GossipKind::BeaconBlock));
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn blobs_are_not_subscribed_in_peerdas() {
|
||||
let spec = get_spec();
|
||||
let s = get_sampling_subnets();
|
||||
let topic_config = get_topic_config(&s);
|
||||
assert!(
|
||||
!core_topics_to_subscribe::<E>(ForkName::Fulu, &topic_config, &spec,)
|
||||
.contains(&GossipKind::BlobSidecar(0))
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn columns_are_subscribed_in_peerdas() {
|
||||
let spec = get_spec();
|
||||
let s = get_sampling_subnets();
|
||||
let mut topic_config = get_topic_config(&s);
|
||||
topic_config.subscribe_all_data_column_subnets = true;
|
||||
assert!(
|
||||
core_topics_to_subscribe::<E>(ForkName::Fulu, &topic_config, &spec)
|
||||
.contains(&GossipKind::DataColumnSidecar(0.into()))
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_core_topics_to_subscribe() {
|
||||
type E = MainnetEthSpec;
|
||||
let spec = E::default_spec();
|
||||
let mut all_topics = Vec::new();
|
||||
let topic_config = TopicConfig {
|
||||
subscribe_all_data_column_subnets: false,
|
||||
sampling_subnets: &HashSet::from_iter([1, 2].map(DataColumnSubnetId::new)),
|
||||
};
|
||||
let mut fulu_core_topics = fork_core_topics::<E>(&ForkName::Fulu, &spec, &topic_config);
|
||||
let mut electra_core_topics =
|
||||
fork_core_topics::<E>(&ForkName::Electra, &spec, &topic_config);
|
||||
let mut deneb_core_topics = fork_core_topics::<E>(&ForkName::Deneb, &spec, &topic_config);
|
||||
all_topics.append(&mut fulu_core_topics);
|
||||
all_topics.append(&mut electra_core_topics);
|
||||
all_topics.append(&mut deneb_core_topics);
|
||||
all_topics.extend(CAPELLA_CORE_TOPICS);
|
||||
all_topics.extend(ALTAIR_CORE_TOPICS);
|
||||
all_topics.extend(BASE_CORE_TOPICS);
|
||||
|
||||
let spec = get_spec();
|
||||
let s = HashSet::from_iter([1, 2].map(DataColumnSubnetId::new));
|
||||
let mut topic_config = get_topic_config(&s);
|
||||
topic_config.enable_light_client_server = true;
|
||||
let latest_fork = *ForkName::list_all().last().unwrap();
|
||||
let core_topics = core_topics_to_subscribe::<E>(latest_fork, &spec, &topic_config);
|
||||
let topics = core_topics_to_subscribe::<E>(latest_fork, &topic_config, &spec);
|
||||
|
||||
let mut expected_topics = vec![
|
||||
GossipKind::BeaconBlock,
|
||||
GossipKind::BeaconAggregateAndProof,
|
||||
GossipKind::VoluntaryExit,
|
||||
GossipKind::ProposerSlashing,
|
||||
GossipKind::AttesterSlashing,
|
||||
GossipKind::SignedContributionAndProof,
|
||||
GossipKind::LightClientFinalityUpdate,
|
||||
GossipKind::LightClientOptimisticUpdate,
|
||||
GossipKind::BlsToExecutionChange,
|
||||
];
|
||||
for subnet in s {
|
||||
expected_topics.push(GossipKind::DataColumnSidecar(subnet));
|
||||
}
|
||||
// Need to check all the topics exist in an order independent manner
|
||||
for topic in all_topics {
|
||||
assert!(core_topics.contains(&topic));
|
||||
for expected_topic in expected_topics {
|
||||
assert!(
|
||||
topics.contains(&expected_topic),
|
||||
"Should contain {:?}",
|
||||
expected_topic
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -181,8 +181,6 @@ pub struct NetworkService<T: BeaconChainTypes> {
|
||||
next_fork_subscriptions: Pin<Box<OptionFuture<Sleep>>>,
|
||||
/// A delay that expires when we need to unsubscribe from old fork topics.
|
||||
next_unsubscribe: Pin<Box<OptionFuture<Sleep>>>,
|
||||
/// Subscribe to all the subnets once synced.
|
||||
subscribe_all_subnets: bool,
|
||||
/// Shutdown beacon node after sync is complete.
|
||||
shutdown_after_sync: bool,
|
||||
/// Whether metrics are enabled or not.
|
||||
@@ -191,8 +189,6 @@ pub struct NetworkService<T: BeaconChainTypes> {
|
||||
metrics_update: tokio::time::Interval,
|
||||
/// gossipsub_parameter_update timer
|
||||
gossipsub_parameter_update: tokio::time::Interval,
|
||||
/// enable_light_client_server indicator
|
||||
enable_light_client_server: bool,
|
||||
/// The logger for the network service.
|
||||
fork_context: Arc<ForkContext>,
|
||||
log: slog::Logger,
|
||||
@@ -347,14 +343,12 @@ impl<T: BeaconChainTypes> NetworkService<T> {
|
||||
next_fork_update,
|
||||
next_fork_subscriptions,
|
||||
next_unsubscribe,
|
||||
subscribe_all_subnets: config.subscribe_all_subnets,
|
||||
shutdown_after_sync: config.shutdown_after_sync,
|
||||
metrics_enabled: config.metrics_enabled,
|
||||
metrics_update,
|
||||
gossipsub_parameter_update,
|
||||
fork_context,
|
||||
log: network_log,
|
||||
enable_light_client_server: config.enable_light_client_server,
|
||||
};
|
||||
|
||||
Ok((network_service, network_globals, network_senders))
|
||||
@@ -713,8 +707,8 @@ impl<T: BeaconChainTypes> NetworkService<T> {
|
||||
let mut subscribed_topics: Vec<GossipTopic> = vec![];
|
||||
for topic_kind in core_topics_to_subscribe::<T::EthSpec>(
|
||||
self.fork_context.current_fork(),
|
||||
&self.fork_context.spec,
|
||||
&self.network_globals.as_topic_config(),
|
||||
&self.fork_context.spec,
|
||||
) {
|
||||
for fork_digest in self.required_gossip_fork_digests() {
|
||||
let topic = GossipTopic::new(
|
||||
@@ -730,57 +724,18 @@ impl<T: BeaconChainTypes> NetworkService<T> {
|
||||
}
|
||||
}
|
||||
|
||||
if self.enable_light_client_server {
|
||||
for light_client_topic_kind in
|
||||
lighthouse_network::types::LIGHT_CLIENT_GOSSIP_TOPICS.iter()
|
||||
{
|
||||
for fork_digest in self.required_gossip_fork_digests() {
|
||||
let light_client_topic = GossipTopic::new(
|
||||
light_client_topic_kind.clone(),
|
||||
GossipEncoding::default(),
|
||||
fork_digest,
|
||||
);
|
||||
if self.libp2p.subscribe(light_client_topic.clone()) {
|
||||
subscribed_topics.push(light_client_topic);
|
||||
} else {
|
||||
warn!(self.log, "Could not subscribe to topic"; "topic" => %light_client_topic);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// If we are to subscribe to all subnets we do it here
|
||||
if self.subscribe_all_subnets {
|
||||
if self.network_globals.config.subscribe_all_subnets {
|
||||
for subnet_id in 0..<<T as BeaconChainTypes>::EthSpec as EthSpec>::SubnetBitfieldLength::to_u64() {
|
||||
let subnet = Subnet::Attestation(SubnetId::new(subnet_id));
|
||||
// Update the ENR bitfield
|
||||
self.libp2p.update_enr_subnet(subnet, true);
|
||||
for fork_digest in self.required_gossip_fork_digests() {
|
||||
let topic = GossipTopic::new(subnet.into(), GossipEncoding::default(), fork_digest);
|
||||
if self.libp2p.subscribe(topic.clone()) {
|
||||
subscribed_topics.push(topic);
|
||||
} else {
|
||||
warn!(self.log, "Could not subscribe to topic"; "topic" => %topic);
|
||||
}
|
||||
}
|
||||
}
|
||||
let subnet_max = <<T as BeaconChainTypes>::EthSpec as EthSpec>::SyncCommitteeSubnetCount::to_u64();
|
||||
for subnet_id in 0..subnet_max {
|
||||
let subnet = Subnet::SyncCommittee(SyncSubnetId::new(subnet_id));
|
||||
// Update the ENR bitfield
|
||||
self.libp2p.update_enr_subnet(subnet, true);
|
||||
for fork_digest in self.required_gossip_fork_digests() {
|
||||
let topic = GossipTopic::new(
|
||||
subnet.into(),
|
||||
GossipEncoding::default(),
|
||||
fork_digest,
|
||||
);
|
||||
if self.libp2p.subscribe(topic.clone()) {
|
||||
subscribed_topics.push(topic);
|
||||
} else {
|
||||
warn!(self.log, "Could not subscribe to topic"; "topic" => %topic);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -909,8 +864,8 @@ impl<T: BeaconChainTypes> NetworkService<T> {
|
||||
fn subscribed_core_topics(&self) -> bool {
|
||||
let core_topics = core_topics_to_subscribe::<T::EthSpec>(
|
||||
self.fork_context.current_fork(),
|
||||
&self.fork_context.spec,
|
||||
&self.network_globals.as_topic_config(),
|
||||
&self.fork_context.spec,
|
||||
);
|
||||
let core_topics: HashSet<&GossipKind> = HashSet::from_iter(&core_topics);
|
||||
let subscriptions = self.network_globals.gossipsub_subscriptions.read();
|
||||
|
||||
@@ -712,6 +712,10 @@ impl ChainSpec {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn all_data_column_sidecar_subnets(&self) -> impl Iterator<Item = DataColumnSubnetId> {
|
||||
(0..self.data_column_sidecar_subnet_count).map(DataColumnSubnetId::new)
|
||||
}
|
||||
|
||||
/// Returns a `ChainSpec` compatible with the Ethereum Foundation specification.
|
||||
pub fn mainnet() -> Self {
|
||||
Self {
|
||||
|
||||
Reference in New Issue
Block a user