diff --git a/beacon_node/lighthouse_network/src/service/mod.rs b/beacon_node/lighthouse_network/src/service/mod.rs index 0110eb9581..e20b86e546 100644 --- a/beacon_node/lighthouse_network/src/service/mod.rs +++ b/beacon_node/lighthouse_network/src/service/mod.rs @@ -13,8 +13,8 @@ use crate::rpc::*; use crate::service::behaviour::BehaviourEvent; pub use crate::service::behaviour::Gossipsub; use crate::types::{ - subnet_from_topic_hash, GossipEncoding, GossipKind, GossipTopic, SnappyTransform, Subnet, - SubnetDiscovery, + fork_core_topics, subnet_from_topic_hash, GossipEncoding, GossipKind, GossipTopic, + SnappyTransform, Subnet, SubnetDiscovery, }; use crate::EnrExt; use crate::Eth2Enr; @@ -41,6 +41,7 @@ use std::{ sync::Arc, task::{Context, Poll}, }; +use types::ForkName; use types::{ consts::altair::SYNC_COMMITTEE_SUBNET_COUNT, EnrForkId, EthSpec, ForkContext, Slot, SubnetId, }; @@ -559,13 +560,20 @@ impl Network { self.unsubscribe(gossip_topic) } - /// Subscribe to all currently subscribed topics with the new fork digest. - pub fn subscribe_new_fork_topics(&mut self, new_fork_digest: [u8; 4]) { + /// Subscribe to all required topics for the `new_fork` with the given `new_fork_digest`. + pub fn subscribe_new_fork_topics(&mut self, new_fork: ForkName, new_fork_digest: [u8; 4]) { + // Subscribe to existing topics with new fork digest let subscriptions = self.network_globals.gossipsub_subscriptions.read().clone(); for mut topic in subscriptions.into_iter() { topic.fork_digest = new_fork_digest; self.subscribe(topic); } + + // Subscribe to core topics for the new fork + for kind in fork_core_topics(&new_fork) { + let topic = GossipTopic::new(kind, GossipEncoding::default(), new_fork_digest); + self.subscribe(topic); + } } /// Unsubscribe from all topics that doesn't have the given fork_digest diff --git a/beacon_node/lighthouse_network/src/types/mod.rs b/beacon_node/lighthouse_network/src/types/mod.rs index 2a5ca6c806..e7457f25da 100644 --- a/beacon_node/lighthouse_network/src/types/mod.rs +++ b/beacon_node/lighthouse_network/src/types/mod.rs @@ -17,6 +17,6 @@ pub use pubsub::{PubsubMessage, SnappyTransform}; pub use subnet::{Subnet, SubnetDiscovery}; pub use sync_state::{BackFillState, SyncState}; pub use topics::{ - subnet_from_topic_hash, GossipEncoding, GossipKind, GossipTopic, CORE_TOPICS, - LIGHT_CLIENT_GOSSIP_TOPICS, + core_topics_to_subscribe, fork_core_topics, subnet_from_topic_hash, GossipEncoding, GossipKind, + GossipTopic, LIGHT_CLIENT_GOSSIP_TOPICS, }; diff --git a/beacon_node/lighthouse_network/src/types/topics.rs b/beacon_node/lighthouse_network/src/types/topics.rs index 8194fa63bd..0e4aefbb5c 100644 --- a/beacon_node/lighthouse_network/src/types/topics.rs +++ b/beacon_node/lighthouse_network/src/types/topics.rs @@ -1,7 +1,7 @@ use libp2p::gossipsub::{IdentTopic as Topic, TopicHash}; use serde_derive::{Deserialize, Serialize}; use strum::AsRefStr; -use types::{SubnetId, SyncSubnetId}; +use types::{ForkName, SubnetId, SyncSubnetId}; use crate::Subnet; @@ -22,21 +22,45 @@ pub const BLS_TO_EXECUTION_CHANGE_TOPIC: &str = "bls_to_execution_change"; pub const LIGHT_CLIENT_FINALITY_UPDATE: &str = "light_client_finality_update"; pub const LIGHT_CLIENT_OPTIMISTIC_UPDATE: &str = "light_client_optimistic_update"; -pub const CORE_TOPICS: [GossipKind; 7] = [ +pub const BASE_CORE_TOPICS: [GossipKind; 5] = [ GossipKind::BeaconBlock, GossipKind::BeaconAggregateAndProof, GossipKind::VoluntaryExit, GossipKind::ProposerSlashing, GossipKind::AttesterSlashing, - GossipKind::SignedContributionAndProof, - GossipKind::BlsToExecutionChange, ]; +pub const ALTAIR_CORE_TOPICS: [GossipKind; 1] = [GossipKind::SignedContributionAndProof]; + +pub const CAPELLA_CORE_TOPICS: [GossipKind; 1] = [GossipKind::BlsToExecutionChange]; + pub const LIGHT_CLIENT_GOSSIP_TOPICS: [GossipKind; 2] = [ GossipKind::LightClientFinalityUpdate, GossipKind::LightClientOptimisticUpdate, ]; +/// Returns the core topics associated with each fork that are new to the previous fork +pub fn fork_core_topics(fork_name: &ForkName) -> Vec { + match fork_name { + ForkName::Base => BASE_CORE_TOPICS.to_vec(), + ForkName::Altair => ALTAIR_CORE_TOPICS.to_vec(), + ForkName::Merge => vec![], + ForkName::Capella => CAPELLA_CORE_TOPICS.to_vec(), + } +} + +/// Returns all the topics that we need to subscribe to for a given fork +/// including topics from older forks and new topics for the current fork. +pub fn core_topics_to_subscribe(mut current_fork: ForkName) -> Vec { + let mut topics = fork_core_topics(¤t_fork); + while let Some(previous_fork) = current_fork.previous_fork() { + let previous_fork_topics = fork_core_topics(&previous_fork); + topics.extend(previous_fork_topics); + current_fork = previous_fork; + } + topics +} + /// A gossipsub topic which encapsulates the type of messages that should be sent and received over /// the pubsub protocol and the way the messages should be encoded. #[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Hash)] @@ -390,4 +414,15 @@ mod tests { assert_eq!("proposer_slashing", ProposerSlashing.as_ref()); assert_eq!("attester_slashing", AttesterSlashing.as_ref()); } + + #[test] + fn test_core_topics_to_subscribe() { + let mut all_topics = Vec::new(); + all_topics.extend(CAPELLA_CORE_TOPICS); + all_topics.extend(ALTAIR_CORE_TOPICS); + all_topics.extend(BASE_CORE_TOPICS); + + let latest_fork = *ForkName::list_all().last().unwrap(); + assert_eq!(core_topics_to_subscribe(latest_fork), all_topics); + } } diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index 4568ed1a22..410461bcd3 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -19,7 +19,7 @@ use lighthouse_network::{ Context, PeerAction, PeerRequestId, PubsubMessage, ReportSource, Request, Response, Subnet, }; use lighthouse_network::{ - types::{GossipEncoding, GossipTopic}, + types::{core_topics_to_subscribe, GossipEncoding, GossipTopic}, MessageId, NetworkEvent, NetworkGlobals, PeerId, }; use slog::{crit, debug, error, info, o, trace, warn}; @@ -445,7 +445,7 @@ impl NetworkService { let fork_version = self.beacon_chain.spec.fork_version_for_name(fork_name); let fork_digest = ChainSpec::compute_fork_digest(fork_version, self.beacon_chain.genesis_validators_root); info!(self.log, "Subscribing to new fork topics"); - self.libp2p.subscribe_new_fork_topics(fork_digest); + self.libp2p.subscribe_new_fork_topics(fork_name, fork_digest); self.next_fork_subscriptions = Box::pin(None.into()); } else { @@ -684,7 +684,7 @@ impl NetworkService { } let mut subscribed_topics: Vec = vec![]; - for topic_kind in lighthouse_network::types::CORE_TOPICS.iter() { + for topic_kind in core_topics_to_subscribe(self.fork_context.current_fork()) { for fork_digest in self.required_gossip_fork_digests() { let topic = GossipTopic::new( topic_kind.clone(),