mirror of
https://github.com/sigp/lighthouse.git
synced 2026-03-02 16:21:42 +00:00
Unsubscribe blob topics at Fulu fork (#6932)
Addresses #6854. PeerDAS requires unsubscribing a Gossip topic at a fork boundary. This is not possible with our current topic machinery. Instead of defining which topics have to be **added** at a given fork, we define the complete set of topics at a given fork. The new start of the show and key function is: ```rust pub fn core_topics_to_subscribe<E: EthSpec>( fork_name: ForkName, opts: &TopicConfig, spec: &ChainSpec, ) -> Vec<GossipKind> { // ... if fork_name.deneb_enabled() && !fork_name.fulu_enabled() { // All of deneb blob topics are core topics for i in 0..spec.blob_sidecar_subnet_count(fork_name) { topics.push(GossipKind::BlobSidecar(i)); } } // ... } ``` `core_topics_to_subscribe` only returns the blob topics if `fork < Fulu`. Then at the fork boundary, we subscribe with the new fork digest to `core_topics_to_subscribe(next_fork)`, which excludes the blob topics. I added `is_fork_non_core_topic` to carry on to the next fork the aggregator topics for attestations and sync committee messages. This approach is future-proof if those topics ever become fork-dependent. Closes https://github.com/sigp/lighthouse/issues/6854
This commit is contained in:
@@ -14,9 +14,8 @@ use crate::rpc::{
|
|||||||
RequestType, ResponseTermination, RpcErrorResponse, RpcResponse, RpcSuccessResponse, RPC,
|
RequestType, ResponseTermination, RpcErrorResponse, RpcResponse, RpcSuccessResponse, RPC,
|
||||||
};
|
};
|
||||||
use crate::types::{
|
use crate::types::{
|
||||||
attestation_sync_committee_topics, fork_core_topics, subnet_from_topic_hash, GossipEncoding,
|
all_topics_at_fork, core_topics_to_subscribe, is_fork_non_core_topic, subnet_from_topic_hash,
|
||||||
GossipKind, GossipTopic, SnappyTransform, Subnet, SubnetDiscovery, ALTAIR_CORE_TOPICS,
|
GossipEncoding, GossipKind, GossipTopic, SnappyTransform, Subnet, SubnetDiscovery,
|
||||||
BASE_CORE_TOPICS, CAPELLA_CORE_TOPICS, LIGHT_CLIENT_GOSSIP_TOPICS,
|
|
||||||
};
|
};
|
||||||
use crate::EnrExt;
|
use crate::EnrExt;
|
||||||
use crate::Eth2Enr;
|
use crate::Eth2Enr;
|
||||||
@@ -280,14 +279,39 @@ impl<E: EthSpec> Network<E> {
|
|||||||
// Set up a scoring update interval
|
// Set up a scoring update interval
|
||||||
let update_gossipsub_scores = tokio::time::interval(params.decay_interval);
|
let update_gossipsub_scores = tokio::time::interval(params.decay_interval);
|
||||||
|
|
||||||
let max_topics = ctx.chain_spec.attestation_subnet_count as usize
|
let current_and_future_forks = ForkName::list_all().into_iter().filter_map(|fork| {
|
||||||
+ SYNC_COMMITTEE_SUBNET_COUNT as usize
|
if fork >= ctx.fork_context.current_fork() {
|
||||||
+ ctx.chain_spec.blob_sidecar_subnet_count_max() as usize
|
ctx.fork_context
|
||||||
+ ctx.chain_spec.data_column_sidecar_subnet_count as usize
|
.to_context_bytes(fork)
|
||||||
+ BASE_CORE_TOPICS.len()
|
.map(|fork_digest| (fork, fork_digest))
|
||||||
+ ALTAIR_CORE_TOPICS.len()
|
} else {
|
||||||
+ CAPELLA_CORE_TOPICS.len() // 0 core deneb and electra topics
|
None
|
||||||
+ LIGHT_CLIENT_GOSSIP_TOPICS.len();
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
let all_topics_for_forks = current_and_future_forks
|
||||||
|
.map(|(fork, fork_digest)| {
|
||||||
|
all_topics_at_fork::<E>(fork, &ctx.chain_spec)
|
||||||
|
.into_iter()
|
||||||
|
.map(|topic| {
|
||||||
|
Topic::new(GossipTopic::new(
|
||||||
|
topic,
|
||||||
|
GossipEncoding::default(),
|
||||||
|
fork_digest,
|
||||||
|
))
|
||||||
|
.into()
|
||||||
|
})
|
||||||
|
.collect::<Vec<TopicHash>>()
|
||||||
|
})
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
|
||||||
|
// For simplicity find the fork with the most individual topics and assume all forks
|
||||||
|
// have the same topic count
|
||||||
|
let max_topics_at_any_fork = all_topics_for_forks
|
||||||
|
.iter()
|
||||||
|
.map(|topics| topics.len())
|
||||||
|
.max()
|
||||||
|
.expect("each fork has at least 5 hardcoded core topics");
|
||||||
|
|
||||||
let possible_fork_digests = ctx.fork_context.all_fork_digests();
|
let possible_fork_digests = ctx.fork_context.all_fork_digests();
|
||||||
let filter = gossipsub::MaxCountSubscriptionFilter {
|
let filter = gossipsub::MaxCountSubscriptionFilter {
|
||||||
@@ -297,9 +321,9 @@ impl<E: EthSpec> Network<E> {
|
|||||||
SYNC_COMMITTEE_SUBNET_COUNT,
|
SYNC_COMMITTEE_SUBNET_COUNT,
|
||||||
),
|
),
|
||||||
// during a fork we subscribe to both the old and new topics
|
// during a fork we subscribe to both the old and new topics
|
||||||
max_subscribed_topics: max_topics * 4,
|
max_subscribed_topics: max_topics_at_any_fork * 4,
|
||||||
// 424 in theory = (64 attestation + 4 sync committee + 7 core topics + 9 blob topics + 128 column topics) * 2
|
// 424 in theory = (64 attestation + 4 sync committee + 7 core topics + 9 blob topics + 128 column topics) * 2
|
||||||
max_subscriptions_per_request: max_topics * 2,
|
max_subscriptions_per_request: max_topics_at_any_fork * 2,
|
||||||
};
|
};
|
||||||
|
|
||||||
// If metrics are enabled for libp2p build the configuration
|
// If metrics are enabled for libp2p build the configuration
|
||||||
@@ -332,17 +356,9 @@ impl<E: EthSpec> Network<E> {
|
|||||||
// If we are using metrics, then register which topics we want to make sure to keep
|
// If we are using metrics, then register which topics we want to make sure to keep
|
||||||
// track of
|
// track of
|
||||||
if ctx.libp2p_registry.is_some() {
|
if ctx.libp2p_registry.is_some() {
|
||||||
let topics_to_keep_metrics_for = attestation_sync_committee_topics::<E>()
|
for topics in all_topics_for_forks {
|
||||||
.map(|gossip_kind| {
|
gossipsub.register_topics_for_metrics(topics);
|
||||||
Topic::from(GossipTopic::new(
|
}
|
||||||
gossip_kind,
|
|
||||||
GossipEncoding::default(),
|
|
||||||
enr_fork_id.fork_digest,
|
|
||||||
))
|
|
||||||
.into()
|
|
||||||
})
|
|
||||||
.collect::<Vec<TopicHash>>();
|
|
||||||
gossipsub.register_topics_for_metrics(topics_to_keep_metrics_for);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
(gossipsub, update_gossipsub_scores)
|
(gossipsub, update_gossipsub_scores)
|
||||||
@@ -700,38 +716,26 @@ impl<E: EthSpec> Network<E> {
|
|||||||
|
|
||||||
/// Subscribe to all required topics for the `new_fork` with the given `new_fork_digest`.
|
/// Subscribe to all required topics for the `new_fork` with the given `new_fork_digest`.
|
||||||
pub fn subscribe_new_fork_topics(&mut self, new_fork: ForkName, new_fork_digest: [u8; 4]) {
|
pub fn subscribe_new_fork_topics(&mut self, new_fork: ForkName, new_fork_digest: [u8; 4]) {
|
||||||
// Subscribe to existing topics with new fork digest
|
// Re-subscribe to non-core topics with the new fork digest
|
||||||
let subscriptions = self.network_globals.gossipsub_subscriptions.read().clone();
|
let subscriptions = self.network_globals.gossipsub_subscriptions.read().clone();
|
||||||
for mut topic in subscriptions.into_iter() {
|
for mut topic in subscriptions.into_iter() {
|
||||||
topic.fork_digest = new_fork_digest;
|
if is_fork_non_core_topic(&topic, new_fork) {
|
||||||
self.subscribe(topic);
|
topic.fork_digest = new_fork_digest;
|
||||||
|
self.subscribe(topic);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Subscribe to core topics for the new fork
|
// Subscribe to core topics for the new fork
|
||||||
for kind in fork_core_topics::<E>(
|
for kind in core_topics_to_subscribe::<E>(
|
||||||
&new_fork,
|
new_fork,
|
||||||
&self.fork_context.spec,
|
|
||||||
&self.network_globals.as_topic_config(),
|
&self.network_globals.as_topic_config(),
|
||||||
|
&self.fork_context.spec,
|
||||||
) {
|
) {
|
||||||
let topic = GossipTopic::new(kind, GossipEncoding::default(), new_fork_digest);
|
let topic = GossipTopic::new(kind, GossipEncoding::default(), new_fork_digest);
|
||||||
self.subscribe(topic);
|
self.subscribe(topic);
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO(das): unsubscribe from blob topics at the Fulu fork
|
// Already registered all possible gossipsub topics for metrics
|
||||||
|
|
||||||
// Register the new topics for metrics
|
|
||||||
let topics_to_keep_metrics_for = attestation_sync_committee_topics::<E>()
|
|
||||||
.map(|gossip_kind| {
|
|
||||||
Topic::from(GossipTopic::new(
|
|
||||||
gossip_kind,
|
|
||||||
GossipEncoding::default(),
|
|
||||||
new_fork_digest,
|
|
||||||
))
|
|
||||||
.into()
|
|
||||||
})
|
|
||||||
.collect::<Vec<TopicHash>>();
|
|
||||||
self.gossipsub_mut()
|
|
||||||
.register_topics_for_metrics(topics_to_keep_metrics_for);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Unsubscribe from all topics that doesn't have the given fork_digest
|
/// Unsubscribe from all topics that doesn't have the given fork_digest
|
||||||
|
|||||||
@@ -187,6 +187,8 @@ impl<E: EthSpec> NetworkGlobals<E> {
|
|||||||
/// Returns the TopicConfig to compute the set of Gossip topics for a given fork
|
/// Returns the TopicConfig to compute the set of Gossip topics for a given fork
|
||||||
pub fn as_topic_config(&self) -> TopicConfig {
|
pub fn as_topic_config(&self) -> TopicConfig {
|
||||||
TopicConfig {
|
TopicConfig {
|
||||||
|
enable_light_client_server: self.config.enable_light_client_server,
|
||||||
|
subscribe_all_subnets: self.config.subscribe_all_subnets,
|
||||||
subscribe_all_data_column_subnets: self.config.subscribe_all_data_column_subnets,
|
subscribe_all_data_column_subnets: self.config.subscribe_all_data_column_subnets,
|
||||||
sampling_subnets: &self.sampling_subnets,
|
sampling_subnets: &self.sampling_subnets,
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -16,7 +16,6 @@ pub use pubsub::{PubsubMessage, SnappyTransform};
|
|||||||
pub use subnet::{Subnet, SubnetDiscovery};
|
pub use subnet::{Subnet, SubnetDiscovery};
|
||||||
pub use sync_state::{BackFillState, SyncState};
|
pub use sync_state::{BackFillState, SyncState};
|
||||||
pub use topics::{
|
pub use topics::{
|
||||||
attestation_sync_committee_topics, core_topics_to_subscribe, fork_core_topics,
|
all_topics_at_fork, core_topics_to_subscribe, is_fork_non_core_topic, subnet_from_topic_hash,
|
||||||
subnet_from_topic_hash, GossipEncoding, GossipKind, GossipTopic, TopicConfig,
|
GossipEncoding, GossipKind, GossipTopic, TopicConfig,
|
||||||
ALTAIR_CORE_TOPICS, BASE_CORE_TOPICS, CAPELLA_CORE_TOPICS, LIGHT_CLIENT_GOSSIP_TOPICS,
|
|
||||||
};
|
};
|
||||||
|
|||||||
@@ -25,104 +25,110 @@ pub const BLS_TO_EXECUTION_CHANGE_TOPIC: &str = "bls_to_execution_change";
|
|||||||
pub const LIGHT_CLIENT_FINALITY_UPDATE: &str = "light_client_finality_update";
|
pub const LIGHT_CLIENT_FINALITY_UPDATE: &str = "light_client_finality_update";
|
||||||
pub const LIGHT_CLIENT_OPTIMISTIC_UPDATE: &str = "light_client_optimistic_update";
|
pub const LIGHT_CLIENT_OPTIMISTIC_UPDATE: &str = "light_client_optimistic_update";
|
||||||
|
|
||||||
pub const BASE_CORE_TOPICS: [GossipKind; 5] = [
|
|
||||||
GossipKind::BeaconBlock,
|
|
||||||
GossipKind::BeaconAggregateAndProof,
|
|
||||||
GossipKind::VoluntaryExit,
|
|
||||||
GossipKind::ProposerSlashing,
|
|
||||||
GossipKind::AttesterSlashing,
|
|
||||||
];
|
|
||||||
|
|
||||||
pub const ALTAIR_CORE_TOPICS: [GossipKind; 1] = [GossipKind::SignedContributionAndProof];
|
|
||||||
|
|
||||||
pub const CAPELLA_CORE_TOPICS: [GossipKind; 1] = [GossipKind::BlsToExecutionChange];
|
|
||||||
|
|
||||||
pub const LIGHT_CLIENT_GOSSIP_TOPICS: [GossipKind; 2] = [
|
|
||||||
GossipKind::LightClientFinalityUpdate,
|
|
||||||
GossipKind::LightClientOptimisticUpdate,
|
|
||||||
];
|
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct TopicConfig<'a> {
|
pub struct TopicConfig<'a> {
|
||||||
|
pub enable_light_client_server: bool,
|
||||||
|
pub subscribe_all_subnets: bool,
|
||||||
pub subscribe_all_data_column_subnets: bool,
|
pub subscribe_all_data_column_subnets: bool,
|
||||||
pub sampling_subnets: &'a HashSet<DataColumnSubnetId>,
|
pub sampling_subnets: &'a HashSet<DataColumnSubnetId>,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns the core topics associated with each fork that are new to the previous fork
|
/// Returns all the topics the node should subscribe at `fork_name`
|
||||||
pub fn fork_core_topics<E: EthSpec>(
|
|
||||||
fork_name: &ForkName,
|
|
||||||
spec: &ChainSpec,
|
|
||||||
topic_config: &TopicConfig,
|
|
||||||
) -> Vec<GossipKind> {
|
|
||||||
match fork_name {
|
|
||||||
ForkName::Base => BASE_CORE_TOPICS.to_vec(),
|
|
||||||
ForkName::Altair => ALTAIR_CORE_TOPICS.to_vec(),
|
|
||||||
ForkName::Bellatrix => vec![],
|
|
||||||
ForkName::Capella => CAPELLA_CORE_TOPICS.to_vec(),
|
|
||||||
ForkName::Deneb => {
|
|
||||||
// All of deneb blob topics are core topics
|
|
||||||
let mut deneb_blob_topics = Vec::new();
|
|
||||||
for i in 0..spec.blob_sidecar_subnet_count(ForkName::Deneb) {
|
|
||||||
deneb_blob_topics.push(GossipKind::BlobSidecar(i));
|
|
||||||
}
|
|
||||||
deneb_blob_topics
|
|
||||||
}
|
|
||||||
ForkName::Electra => {
|
|
||||||
// All of electra blob topics are core topics
|
|
||||||
let mut electra_blob_topics = Vec::new();
|
|
||||||
for i in 0..spec.blob_sidecar_subnet_count(ForkName::Electra) {
|
|
||||||
electra_blob_topics.push(GossipKind::BlobSidecar(i));
|
|
||||||
}
|
|
||||||
electra_blob_topics
|
|
||||||
}
|
|
||||||
ForkName::Fulu => {
|
|
||||||
let mut topics = vec![];
|
|
||||||
if topic_config.subscribe_all_data_column_subnets {
|
|
||||||
for column_subnet in 0..spec.data_column_sidecar_subnet_count {
|
|
||||||
topics.push(GossipKind::DataColumnSidecar(DataColumnSubnetId::new(
|
|
||||||
column_subnet,
|
|
||||||
)));
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
for column_subnet in topic_config.sampling_subnets {
|
|
||||||
topics.push(GossipKind::DataColumnSidecar(*column_subnet));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
topics
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Returns all the attestation and sync committee topics, for a given fork.
|
|
||||||
pub fn attestation_sync_committee_topics<E: EthSpec>() -> impl Iterator<Item = GossipKind> {
|
|
||||||
(0..E::SubnetBitfieldLength::to_usize())
|
|
||||||
.map(|subnet_id| GossipKind::Attestation(SubnetId::new(subnet_id as u64)))
|
|
||||||
.chain(
|
|
||||||
(0..E::SyncCommitteeSubnetCount::to_usize()).map(|sync_committee_id| {
|
|
||||||
GossipKind::SyncCommitteeMessage(SyncSubnetId::new(sync_committee_id as u64))
|
|
||||||
}),
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Returns all the topics that we need to subscribe to for a given fork
|
|
||||||
/// including topics from older forks and new topics for the current fork.
|
|
||||||
pub fn core_topics_to_subscribe<E: EthSpec>(
|
pub fn core_topics_to_subscribe<E: EthSpec>(
|
||||||
mut current_fork: ForkName,
|
fork_name: ForkName,
|
||||||
|
opts: &TopicConfig,
|
||||||
spec: &ChainSpec,
|
spec: &ChainSpec,
|
||||||
topic_config: &TopicConfig,
|
|
||||||
) -> Vec<GossipKind> {
|
) -> Vec<GossipKind> {
|
||||||
let mut topics = fork_core_topics::<E>(¤t_fork, spec, topic_config);
|
let mut topics = vec![
|
||||||
while let Some(previous_fork) = current_fork.previous_fork() {
|
GossipKind::BeaconBlock,
|
||||||
let previous_fork_topics = fork_core_topics::<E>(&previous_fork, spec, topic_config);
|
GossipKind::BeaconAggregateAndProof,
|
||||||
topics.extend(previous_fork_topics);
|
GossipKind::VoluntaryExit,
|
||||||
current_fork = previous_fork;
|
GossipKind::ProposerSlashing,
|
||||||
|
GossipKind::AttesterSlashing,
|
||||||
|
];
|
||||||
|
|
||||||
|
if opts.subscribe_all_subnets {
|
||||||
|
for i in 0..spec.attestation_subnet_count {
|
||||||
|
topics.push(GossipKind::Attestation(i.into()));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
// Remove duplicates
|
|
||||||
|
if fork_name.altair_enabled() {
|
||||||
|
topics.push(GossipKind::SignedContributionAndProof);
|
||||||
|
|
||||||
|
if opts.subscribe_all_subnets {
|
||||||
|
for i in 0..E::SyncCommitteeSubnetCount::to_u64() {
|
||||||
|
topics.push(GossipKind::SyncCommitteeMessage(i.into()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if opts.enable_light_client_server {
|
||||||
|
topics.push(GossipKind::LightClientFinalityUpdate);
|
||||||
|
topics.push(GossipKind::LightClientOptimisticUpdate);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if fork_name.capella_enabled() {
|
||||||
|
topics.push(GossipKind::BlsToExecutionChange);
|
||||||
|
}
|
||||||
|
|
||||||
|
if fork_name.deneb_enabled() && !fork_name.fulu_enabled() {
|
||||||
|
// All of deneb blob topics are core topics
|
||||||
|
for i in 0..spec.blob_sidecar_subnet_count(fork_name) {
|
||||||
|
topics.push(GossipKind::BlobSidecar(i));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if fork_name.fulu_enabled() {
|
||||||
|
if opts.subscribe_all_data_column_subnets {
|
||||||
|
for i in 0..spec.data_column_sidecar_subnet_count {
|
||||||
|
topics.push(GossipKind::DataColumnSidecar(i.into()));
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
for subnet in opts.sampling_subnets {
|
||||||
|
topics.push(GossipKind::DataColumnSidecar(*subnet));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
topics
|
topics
|
||||||
.into_iter()
|
}
|
||||||
.collect::<std::collections::HashSet<_>>()
|
|
||||||
.into_iter()
|
/// Returns true if a given non-core `GossipTopic` MAY be subscribe at this fork.
|
||||||
.collect()
|
///
|
||||||
|
/// For example: the `Attestation` topic is not subscribed as a core topic if
|
||||||
|
/// subscribe_all_subnets = false` but we may subscribe to it outside of a fork
|
||||||
|
/// boundary if the node is an aggregator.
|
||||||
|
pub fn is_fork_non_core_topic(topic: &GossipTopic, _fork_name: ForkName) -> bool {
|
||||||
|
match topic.kind() {
|
||||||
|
// Node may be aggregator of attestation and sync_committee_message topics for all known
|
||||||
|
// forks
|
||||||
|
GossipKind::Attestation(_) | GossipKind::SyncCommitteeMessage(_) => true,
|
||||||
|
// All these topics are core-only
|
||||||
|
GossipKind::BeaconBlock
|
||||||
|
| GossipKind::BeaconAggregateAndProof
|
||||||
|
| GossipKind::BlobSidecar(_)
|
||||||
|
| GossipKind::DataColumnSidecar(_)
|
||||||
|
| GossipKind::VoluntaryExit
|
||||||
|
| GossipKind::ProposerSlashing
|
||||||
|
| GossipKind::AttesterSlashing
|
||||||
|
| GossipKind::SignedContributionAndProof
|
||||||
|
| GossipKind::BlsToExecutionChange
|
||||||
|
| GossipKind::LightClientFinalityUpdate
|
||||||
|
| GossipKind::LightClientOptimisticUpdate => false,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn all_topics_at_fork<E: EthSpec>(fork: ForkName, spec: &ChainSpec) -> Vec<GossipKind> {
|
||||||
|
// Compute the worst case of all forks
|
||||||
|
let sampling_subnets = HashSet::from_iter(spec.all_data_column_sidecar_subnets());
|
||||||
|
let opts = TopicConfig {
|
||||||
|
enable_light_client_server: true,
|
||||||
|
subscribe_all_subnets: true,
|
||||||
|
subscribe_all_data_column_subnets: true,
|
||||||
|
sampling_subnets: &sampling_subnets,
|
||||||
|
};
|
||||||
|
core_topics_to_subscribe::<E>(fork, &opts, spec)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// A gossipsub topic which encapsulates the type of messages that should be sent and received over
|
/// A gossipsub topic which encapsulates the type of messages that should be sent and received over
|
||||||
@@ -368,10 +374,9 @@ fn subnet_topic_index(topic: &str) -> Option<GossipKind> {
|
|||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use types::MainnetEthSpec;
|
|
||||||
|
|
||||||
use super::GossipKind::*;
|
use super::GossipKind::*;
|
||||||
use super::*;
|
use super::*;
|
||||||
|
use types::{Epoch, MainnetEthSpec as E};
|
||||||
|
|
||||||
const GOOD_FORK_DIGEST: &str = "e1925f3b";
|
const GOOD_FORK_DIGEST: &str = "e1925f3b";
|
||||||
const BAD_PREFIX: &str = "tezos";
|
const BAD_PREFIX: &str = "tezos";
|
||||||
@@ -496,31 +501,94 @@ mod tests {
|
|||||||
assert_eq!("attester_slashing", AttesterSlashing.as_ref());
|
assert_eq!("attester_slashing", AttesterSlashing.as_ref());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn get_spec() -> ChainSpec {
|
||||||
|
let mut spec = E::default_spec();
|
||||||
|
spec.altair_fork_epoch = Some(Epoch::new(1));
|
||||||
|
spec.bellatrix_fork_epoch = Some(Epoch::new(2));
|
||||||
|
spec.capella_fork_epoch = Some(Epoch::new(3));
|
||||||
|
spec.deneb_fork_epoch = Some(Epoch::new(4));
|
||||||
|
spec.electra_fork_epoch = Some(Epoch::new(5));
|
||||||
|
spec.fulu_fork_epoch = Some(Epoch::new(6));
|
||||||
|
spec
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get_sampling_subnets() -> HashSet<DataColumnSubnetId> {
|
||||||
|
HashSet::new()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get_topic_config(sampling_subnets: &HashSet<DataColumnSubnetId>) -> TopicConfig {
|
||||||
|
TopicConfig {
|
||||||
|
enable_light_client_server: false,
|
||||||
|
subscribe_all_subnets: false,
|
||||||
|
subscribe_all_data_column_subnets: false,
|
||||||
|
sampling_subnets,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn base_topics_are_always_active() {
|
||||||
|
let spec = get_spec();
|
||||||
|
let s = get_sampling_subnets();
|
||||||
|
let topic_config = get_topic_config(&s);
|
||||||
|
for fork in ForkName::list_all() {
|
||||||
|
assert!(core_topics_to_subscribe::<E>(fork, &topic_config, &spec,)
|
||||||
|
.contains(&GossipKind::BeaconBlock));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn blobs_are_not_subscribed_in_peerdas() {
|
||||||
|
let spec = get_spec();
|
||||||
|
let s = get_sampling_subnets();
|
||||||
|
let topic_config = get_topic_config(&s);
|
||||||
|
assert!(
|
||||||
|
!core_topics_to_subscribe::<E>(ForkName::Fulu, &topic_config, &spec,)
|
||||||
|
.contains(&GossipKind::BlobSidecar(0))
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn columns_are_subscribed_in_peerdas() {
|
||||||
|
let spec = get_spec();
|
||||||
|
let s = get_sampling_subnets();
|
||||||
|
let mut topic_config = get_topic_config(&s);
|
||||||
|
topic_config.subscribe_all_data_column_subnets = true;
|
||||||
|
assert!(
|
||||||
|
core_topics_to_subscribe::<E>(ForkName::Fulu, &topic_config, &spec)
|
||||||
|
.contains(&GossipKind::DataColumnSidecar(0.into()))
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_core_topics_to_subscribe() {
|
fn test_core_topics_to_subscribe() {
|
||||||
type E = MainnetEthSpec;
|
let spec = get_spec();
|
||||||
let spec = E::default_spec();
|
let s = HashSet::from_iter([1, 2].map(DataColumnSubnetId::new));
|
||||||
let mut all_topics = Vec::new();
|
let mut topic_config = get_topic_config(&s);
|
||||||
let topic_config = TopicConfig {
|
topic_config.enable_light_client_server = true;
|
||||||
subscribe_all_data_column_subnets: false,
|
|
||||||
sampling_subnets: &HashSet::from_iter([1, 2].map(DataColumnSubnetId::new)),
|
|
||||||
};
|
|
||||||
let mut fulu_core_topics = fork_core_topics::<E>(&ForkName::Fulu, &spec, &topic_config);
|
|
||||||
let mut electra_core_topics =
|
|
||||||
fork_core_topics::<E>(&ForkName::Electra, &spec, &topic_config);
|
|
||||||
let mut deneb_core_topics = fork_core_topics::<E>(&ForkName::Deneb, &spec, &topic_config);
|
|
||||||
all_topics.append(&mut fulu_core_topics);
|
|
||||||
all_topics.append(&mut electra_core_topics);
|
|
||||||
all_topics.append(&mut deneb_core_topics);
|
|
||||||
all_topics.extend(CAPELLA_CORE_TOPICS);
|
|
||||||
all_topics.extend(ALTAIR_CORE_TOPICS);
|
|
||||||
all_topics.extend(BASE_CORE_TOPICS);
|
|
||||||
|
|
||||||
let latest_fork = *ForkName::list_all().last().unwrap();
|
let latest_fork = *ForkName::list_all().last().unwrap();
|
||||||
let core_topics = core_topics_to_subscribe::<E>(latest_fork, &spec, &topic_config);
|
let topics = core_topics_to_subscribe::<E>(latest_fork, &topic_config, &spec);
|
||||||
|
|
||||||
|
let mut expected_topics = vec![
|
||||||
|
GossipKind::BeaconBlock,
|
||||||
|
GossipKind::BeaconAggregateAndProof,
|
||||||
|
GossipKind::VoluntaryExit,
|
||||||
|
GossipKind::ProposerSlashing,
|
||||||
|
GossipKind::AttesterSlashing,
|
||||||
|
GossipKind::SignedContributionAndProof,
|
||||||
|
GossipKind::LightClientFinalityUpdate,
|
||||||
|
GossipKind::LightClientOptimisticUpdate,
|
||||||
|
GossipKind::BlsToExecutionChange,
|
||||||
|
];
|
||||||
|
for subnet in s {
|
||||||
|
expected_topics.push(GossipKind::DataColumnSidecar(subnet));
|
||||||
|
}
|
||||||
// Need to check all the topics exist in an order independent manner
|
// Need to check all the topics exist in an order independent manner
|
||||||
for topic in all_topics {
|
for expected_topic in expected_topics {
|
||||||
assert!(core_topics.contains(&topic));
|
assert!(
|
||||||
|
topics.contains(&expected_topic),
|
||||||
|
"Should contain {:?}",
|
||||||
|
expected_topic
|
||||||
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -181,8 +181,6 @@ pub struct NetworkService<T: BeaconChainTypes> {
|
|||||||
next_fork_subscriptions: Pin<Box<OptionFuture<Sleep>>>,
|
next_fork_subscriptions: Pin<Box<OptionFuture<Sleep>>>,
|
||||||
/// A delay that expires when we need to unsubscribe from old fork topics.
|
/// A delay that expires when we need to unsubscribe from old fork topics.
|
||||||
next_unsubscribe: Pin<Box<OptionFuture<Sleep>>>,
|
next_unsubscribe: Pin<Box<OptionFuture<Sleep>>>,
|
||||||
/// Subscribe to all the subnets once synced.
|
|
||||||
subscribe_all_subnets: bool,
|
|
||||||
/// Shutdown beacon node after sync is complete.
|
/// Shutdown beacon node after sync is complete.
|
||||||
shutdown_after_sync: bool,
|
shutdown_after_sync: bool,
|
||||||
/// Whether metrics are enabled or not.
|
/// Whether metrics are enabled or not.
|
||||||
@@ -191,8 +189,6 @@ pub struct NetworkService<T: BeaconChainTypes> {
|
|||||||
metrics_update: tokio::time::Interval,
|
metrics_update: tokio::time::Interval,
|
||||||
/// gossipsub_parameter_update timer
|
/// gossipsub_parameter_update timer
|
||||||
gossipsub_parameter_update: tokio::time::Interval,
|
gossipsub_parameter_update: tokio::time::Interval,
|
||||||
/// enable_light_client_server indicator
|
|
||||||
enable_light_client_server: bool,
|
|
||||||
/// The logger for the network service.
|
/// The logger for the network service.
|
||||||
fork_context: Arc<ForkContext>,
|
fork_context: Arc<ForkContext>,
|
||||||
log: slog::Logger,
|
log: slog::Logger,
|
||||||
@@ -347,14 +343,12 @@ impl<T: BeaconChainTypes> NetworkService<T> {
|
|||||||
next_fork_update,
|
next_fork_update,
|
||||||
next_fork_subscriptions,
|
next_fork_subscriptions,
|
||||||
next_unsubscribe,
|
next_unsubscribe,
|
||||||
subscribe_all_subnets: config.subscribe_all_subnets,
|
|
||||||
shutdown_after_sync: config.shutdown_after_sync,
|
shutdown_after_sync: config.shutdown_after_sync,
|
||||||
metrics_enabled: config.metrics_enabled,
|
metrics_enabled: config.metrics_enabled,
|
||||||
metrics_update,
|
metrics_update,
|
||||||
gossipsub_parameter_update,
|
gossipsub_parameter_update,
|
||||||
fork_context,
|
fork_context,
|
||||||
log: network_log,
|
log: network_log,
|
||||||
enable_light_client_server: config.enable_light_client_server,
|
|
||||||
};
|
};
|
||||||
|
|
||||||
Ok((network_service, network_globals, network_senders))
|
Ok((network_service, network_globals, network_senders))
|
||||||
@@ -713,8 +707,8 @@ impl<T: BeaconChainTypes> NetworkService<T> {
|
|||||||
let mut subscribed_topics: Vec<GossipTopic> = vec![];
|
let mut subscribed_topics: Vec<GossipTopic> = vec![];
|
||||||
for topic_kind in core_topics_to_subscribe::<T::EthSpec>(
|
for topic_kind in core_topics_to_subscribe::<T::EthSpec>(
|
||||||
self.fork_context.current_fork(),
|
self.fork_context.current_fork(),
|
||||||
&self.fork_context.spec,
|
|
||||||
&self.network_globals.as_topic_config(),
|
&self.network_globals.as_topic_config(),
|
||||||
|
&self.fork_context.spec,
|
||||||
) {
|
) {
|
||||||
for fork_digest in self.required_gossip_fork_digests() {
|
for fork_digest in self.required_gossip_fork_digests() {
|
||||||
let topic = GossipTopic::new(
|
let topic = GossipTopic::new(
|
||||||
@@ -730,57 +724,18 @@ impl<T: BeaconChainTypes> NetworkService<T> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if self.enable_light_client_server {
|
|
||||||
for light_client_topic_kind in
|
|
||||||
lighthouse_network::types::LIGHT_CLIENT_GOSSIP_TOPICS.iter()
|
|
||||||
{
|
|
||||||
for fork_digest in self.required_gossip_fork_digests() {
|
|
||||||
let light_client_topic = GossipTopic::new(
|
|
||||||
light_client_topic_kind.clone(),
|
|
||||||
GossipEncoding::default(),
|
|
||||||
fork_digest,
|
|
||||||
);
|
|
||||||
if self.libp2p.subscribe(light_client_topic.clone()) {
|
|
||||||
subscribed_topics.push(light_client_topic);
|
|
||||||
} else {
|
|
||||||
warn!(self.log, "Could not subscribe to topic"; "topic" => %light_client_topic);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// If we are to subscribe to all subnets we do it here
|
// If we are to subscribe to all subnets we do it here
|
||||||
if self.subscribe_all_subnets {
|
if self.network_globals.config.subscribe_all_subnets {
|
||||||
for subnet_id in 0..<<T as BeaconChainTypes>::EthSpec as EthSpec>::SubnetBitfieldLength::to_u64() {
|
for subnet_id in 0..<<T as BeaconChainTypes>::EthSpec as EthSpec>::SubnetBitfieldLength::to_u64() {
|
||||||
let subnet = Subnet::Attestation(SubnetId::new(subnet_id));
|
let subnet = Subnet::Attestation(SubnetId::new(subnet_id));
|
||||||
// Update the ENR bitfield
|
// Update the ENR bitfield
|
||||||
self.libp2p.update_enr_subnet(subnet, true);
|
self.libp2p.update_enr_subnet(subnet, true);
|
||||||
for fork_digest in self.required_gossip_fork_digests() {
|
|
||||||
let topic = GossipTopic::new(subnet.into(), GossipEncoding::default(), fork_digest);
|
|
||||||
if self.libp2p.subscribe(topic.clone()) {
|
|
||||||
subscribed_topics.push(topic);
|
|
||||||
} else {
|
|
||||||
warn!(self.log, "Could not subscribe to topic"; "topic" => %topic);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
let subnet_max = <<T as BeaconChainTypes>::EthSpec as EthSpec>::SyncCommitteeSubnetCount::to_u64();
|
let subnet_max = <<T as BeaconChainTypes>::EthSpec as EthSpec>::SyncCommitteeSubnetCount::to_u64();
|
||||||
for subnet_id in 0..subnet_max {
|
for subnet_id in 0..subnet_max {
|
||||||
let subnet = Subnet::SyncCommittee(SyncSubnetId::new(subnet_id));
|
let subnet = Subnet::SyncCommittee(SyncSubnetId::new(subnet_id));
|
||||||
// Update the ENR bitfield
|
// Update the ENR bitfield
|
||||||
self.libp2p.update_enr_subnet(subnet, true);
|
self.libp2p.update_enr_subnet(subnet, true);
|
||||||
for fork_digest in self.required_gossip_fork_digests() {
|
|
||||||
let topic = GossipTopic::new(
|
|
||||||
subnet.into(),
|
|
||||||
GossipEncoding::default(),
|
|
||||||
fork_digest,
|
|
||||||
);
|
|
||||||
if self.libp2p.subscribe(topic.clone()) {
|
|
||||||
subscribed_topics.push(topic);
|
|
||||||
} else {
|
|
||||||
warn!(self.log, "Could not subscribe to topic"; "topic" => %topic);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -909,8 +864,8 @@ impl<T: BeaconChainTypes> NetworkService<T> {
|
|||||||
fn subscribed_core_topics(&self) -> bool {
|
fn subscribed_core_topics(&self) -> bool {
|
||||||
let core_topics = core_topics_to_subscribe::<T::EthSpec>(
|
let core_topics = core_topics_to_subscribe::<T::EthSpec>(
|
||||||
self.fork_context.current_fork(),
|
self.fork_context.current_fork(),
|
||||||
&self.fork_context.spec,
|
|
||||||
&self.network_globals.as_topic_config(),
|
&self.network_globals.as_topic_config(),
|
||||||
|
&self.fork_context.spec,
|
||||||
);
|
);
|
||||||
let core_topics: HashSet<&GossipKind> = HashSet::from_iter(&core_topics);
|
let core_topics: HashSet<&GossipKind> = HashSet::from_iter(&core_topics);
|
||||||
let subscriptions = self.network_globals.gossipsub_subscriptions.read();
|
let subscriptions = self.network_globals.gossipsub_subscriptions.read();
|
||||||
|
|||||||
@@ -712,6 +712,10 @@ impl ChainSpec {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn all_data_column_sidecar_subnets(&self) -> impl Iterator<Item = DataColumnSubnetId> {
|
||||||
|
(0..self.data_column_sidecar_subnet_count).map(DataColumnSubnetId::new)
|
||||||
|
}
|
||||||
|
|
||||||
/// Returns a `ChainSpec` compatible with the Ethereum Foundation specification.
|
/// Returns a `ChainSpec` compatible with the Ethereum Foundation specification.
|
||||||
pub fn mainnet() -> Self {
|
pub fn mainnet() -> Self {
|
||||||
Self {
|
Self {
|
||||||
|
|||||||
Reference in New Issue
Block a user