diff --git a/beacon_node/eth2_libp2p/src/config.rs b/beacon_node/eth2_libp2p/src/config.rs index b2ee8558cf..73094642d7 100644 --- a/beacon_node/eth2_libp2p/src/config.rs +++ b/beacon_node/eth2_libp2p/src/config.rs @@ -78,15 +78,6 @@ impl Default for Config { network_dir.push(".lighthouse"); network_dir.push("network"); - // The default topics that we will initially subscribe to - let topics = vec![ - GossipKind::BeaconBlock, - GossipKind::BeaconAggregateAndProof, - GossipKind::VoluntaryExit, - GossipKind::ProposerSlashing, - GossipKind::AttesterSlashing, - ]; - // The function used to generate a gossipsub message id // We use the first 8 bytes of SHA256(data) for content addressing let gossip_message_id = @@ -145,7 +136,7 @@ impl Default for Config { trusted_peers: vec![], client_version: lighthouse_version::version_with_platform(), disable_discovery: false, - topics, + topics: Vec::new(), } } } diff --git a/beacon_node/eth2_libp2p/src/service.rs b/beacon_node/eth2_libp2p/src/service.rs index 1d594918d5..52286c05d3 100644 --- a/beacon_node/eth2_libp2p/src/service.rs +++ b/beacon_node/eth2_libp2p/src/service.rs @@ -207,7 +207,9 @@ impl Service { warn!(log, "Could not subscribe to topic"; "topic" => format!("{}",topic_kind)); } } - info!(log, "Subscribed to topics"; "topics" => format!("{:?}", subscribed_topics)); + if !subscribed_topics.is_empty() { + info!(log, "Subscribed to topics"; "topics" => format!("{:?}", subscribed_topics)); + } let service = Service { local_peer_id, diff --git a/beacon_node/eth2_libp2p/src/types/mod.rs b/beacon_node/eth2_libp2p/src/types/mod.rs index ec6fcd4af4..762ea7d740 100644 --- a/beacon_node/eth2_libp2p/src/types/mod.rs +++ b/beacon_node/eth2_libp2p/src/types/mod.rs @@ -16,4 +16,4 @@ pub use globals::NetworkGlobals; pub use pubsub::PubsubMessage; pub use subnet::SubnetDiscovery; pub use sync_state::SyncState; -pub use topics::{GossipEncoding, GossipKind, GossipTopic}; +pub use topics::{GossipEncoding, GossipKind, GossipTopic, CORE_TOPICS}; diff --git a/beacon_node/eth2_libp2p/src/types/topics.rs b/beacon_node/eth2_libp2p/src/types/topics.rs index f564a54137..3f120b3ec1 100644 --- a/beacon_node/eth2_libp2p/src/types/topics.rs +++ b/beacon_node/eth2_libp2p/src/types/topics.rs @@ -14,6 +14,14 @@ pub const VOLUNTARY_EXIT_TOPIC: &str = "voluntary_exit"; pub const PROPOSER_SLASHING_TOPIC: &str = "proposer_slashing"; pub const ATTESTER_SLASHING_TOPIC: &str = "attester_slashing"; +pub const CORE_TOPICS: [GossipKind; 5] = [ + GossipKind::BeaconBlock, + GossipKind::BeaconAggregateAndProof, + GossipKind::VoluntaryExit, + GossipKind::ProposerSlashing, + GossipKind::AttesterSlashing, +]; + /// A gossipsub topic which encapsulates the type of messages that should be sent and received over /// the pubsub protocol and the way the messages should be encoded. #[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Hash)] diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index 84f807007f..a018750f3f 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -35,6 +35,9 @@ pub enum NetworkMessage { Subscribe { subscriptions: Vec, }, + /// Subscribes the beacon node to the core gossipsub topics. We do this when we are either + /// synced or close to the head slot. + SubscribeCoreTopics, /// Send an RPC request to the libp2p service. SendRequest { peer_id: PeerId, @@ -278,6 +281,21 @@ fn spawn_service( warn!(service.log, "Validator subscription failed"; "error" => e); } } + NetworkMessage::SubscribeCoreTopics => { + let mut subscribed_topics: Vec = vec![]; + let already_subscribed = service.network_globals.gossipsub_subscriptions.read().clone(); + let already_subscribed = already_subscribed.iter().map(|x| x.kind()).collect::>(); + for topic_kind in eth2_libp2p::types::CORE_TOPICS.iter().filter(|topic| already_subscribed.get(topic).is_none()) { + if service.libp2p.swarm.subscribe_kind(topic_kind.clone()) { + subscribed_topics.push(topic_kind.clone()); + } else { + warn!(service.log, "Could not subscribe to topic"; "topic" => format!("{}",topic_kind)); + } + } + if !subscribed_topics.is_empty() { + info!(service.log, "Subscribed to topics"; "topics" => format!("{:?}", subscribed_topics)); + } + } } } // process any attestation service events diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 3aa5577d70..f147944478 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -57,7 +57,11 @@ use types::{Epoch, EthSpec, Hash256, SignedBeaconBlock, Slot}; /// The number of slots ahead of us that is allowed before requesting a long-range (batch) Sync /// from a peer. If a peer is within this tolerance (forwards or backwards), it is treated as a /// fully sync'd peer. -pub const SLOT_IMPORT_TOLERANCE: usize = 20; +/// +/// This means that we consider ourselves synced (and hence subscribe to all subnets and block +/// gossip if no peers are further than this range ahead of us that we have not already downloaded +/// blocks for. +pub const SLOT_IMPORT_TOLERANCE: usize = 32; /// How many attempts we try to find a parent of a block before we give up trying . const PARENT_FAIL_TOLERANCE: usize = 5; /// The maximum depth we will search for a parent block. In principle we should have sync'd any @@ -137,7 +141,7 @@ struct ParentRequests { failed_attempts: usize, /// The peer who last submitted a block. If the chain ends or fails, this is the peer that is - /// downvoted. + /// penalized. last_submitted_peer: PeerId, /// The request ID of this lookup is in progress. @@ -277,7 +281,7 @@ impl SyncManager { ); self.synced_peer(&peer_id, remote); // notify the range sync that a peer has been added - self.range_sync.fully_synced_peer_found(); + self.range_sync.fully_synced_peer_found(&mut self.network); } PeerSyncType::Advanced => { trace!(self.log, "Useful peer for sync found"; @@ -303,7 +307,7 @@ impl SyncManager { { self.synced_peer(&peer_id, remote); // notify the range sync that a peer has been added - self.range_sync.fully_synced_peer_found(); + self.range_sync.fully_synced_peer_found(&mut self.network); } else { // Add the peer to our RangeSync self.range_sync @@ -675,6 +679,10 @@ impl SyncManager { fn update_sync_state(&mut self) { if let Some((old_state, new_state)) = self.network_globals.update_sync_state() { info!(self.log, "Sync state updated"; "old_state" => format!("{}", old_state), "new_state" => format!("{}",new_state)); + // If we have become synced - Subscribe to all the core subnet topics + if new_state == eth2_libp2p::types::SyncState::Synced { + self.network.subscribe_core_topics(); + } } } diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index caccce4e61..83f4cb64ed 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -113,8 +113,8 @@ impl SyncNetworkContext { debug!(self.log, "Sync reporting peer"; "peer_id" => peer_id.to_string(), "action" => action.to_string()); self.network_send .send(NetworkMessage::ReportPeer { peer_id, action }) - .unwrap_or_else(|_| { - warn!(self.log, "Could not report peer, channel failed"); + .unwrap_or_else(|e| { + warn!(self.log, "Could not report peer, channel failed"; "error"=> e.to_string()); }); } @@ -133,6 +133,14 @@ impl SyncNetworkContext { Ok(request_id) } + pub fn subscribe_core_topics(&mut self) { + self.network_send + .send(NetworkMessage::SubscribeCoreTopics) + .unwrap_or_else(|e| { + warn!(self.log, "Could not subscribe to core topics."; "error" => e.to_string()); + }); + } + fn send_network_msg(&mut self, msg: NetworkMessage) -> Result<(), &'static str> { self.network_send.send(msg).map_err(|_| { debug!(self.log, "Could not send message to the network service"); diff --git a/beacon_node/network/src/sync/range_sync/chain_collection.rs b/beacon_node/network/src/sync/range_sync/chain_collection.rs index 3ccbb351b7..71e0606703 100644 --- a/beacon_node/network/src/sync/range_sync/chain_collection.rs +++ b/beacon_node/network/src/sync/range_sync/chain_collection.rs @@ -113,7 +113,7 @@ impl ChainCollection { } /// Updates the global sync state and logs any changes. - pub fn update_sync_state(&mut self) { + pub fn update_sync_state(&mut self, network: &mut SyncNetworkContext) { // if there is no range sync occurring, the state is either synced or not based on // connected peers. @@ -130,8 +130,11 @@ impl ChainCollection { let mut peer_state = self.network_globals.sync_state.write(); if new_state != *peer_state { info!(self.log, "Sync state updated"; "old_state" => format!("{}",peer_state), "new_state" => format!("{}",new_state)); + if new_state == SyncState::Synced { + network.subscribe_core_topics(); + } + *peer_state = new_state; } - *peer_state = new_state; } else { // The state is based on a range sync state, update it let mut node_sync_state = self.network_globals.sync_state.write(); @@ -148,12 +151,12 @@ impl ChainCollection { /// /// We could be awaiting a head sync. If we are in the head syncing state, without any head /// chains, then update the state to idle. - pub fn fully_synced_peer_found(&mut self) { + pub fn fully_synced_peer_found(&mut self, network: &mut SyncNetworkContext) { if let RangeSyncState::Head { .. } = self.state { if self.head_chains.is_empty() { // Update the global network state to either synced or stalled. self.state = RangeSyncState::Idle; - self.update_sync_state(); + self.update_sync_state(network); } } } diff --git a/beacon_node/network/src/sync/range_sync/range.rs b/beacon_node/network/src/sync/range_sync/range.rs index 59dfec16de..66339be068 100644 --- a/beacon_node/network/src/sync/range_sync/range.rs +++ b/beacon_node/network/src/sync/range_sync/range.rs @@ -98,8 +98,8 @@ impl RangeSync { /// On re-status, a peer that has no head to download indicates that this state can be set to /// idle as there are in fact no head chains to download. This function notifies the chain /// collection that the state can safely be set to idle. - pub fn fully_synced_peer_found(&mut self) { - self.chains.fully_synced_peer_found() + pub fn fully_synced_peer_found(&mut self, network: &mut SyncNetworkContext) { + self.chains.fully_synced_peer_found(network) } /// A useful peer has been added. The SyncManager has identified this peer as needing either @@ -168,7 +168,7 @@ impl RangeSync { // check if the new peer's addition will favour a new syncing chain. self.chains.update(network); // update the global sync state if necessary - self.chains.update_sync_state(); + self.chains.update_sync_state(network); } else { // there is no finalized chain that matches this peer's last finalized target // create a new finalized chain @@ -183,7 +183,7 @@ impl RangeSync { ); self.chains.update(network); // update the global sync state - self.chains.update_sync_state(); + self.chains.update_sync_state(network); } } RangeSyncType::Head => { @@ -229,7 +229,7 @@ impl RangeSync { ); } self.chains.update(network); - self.chains.update_sync_state(); + self.chains.update_sync_state(network); } } } @@ -292,7 +292,7 @@ impl RangeSync { // head chain. self.chains.set_head_sync(); // Update the global variables - self.chains.update_sync_state(); + self.chains.update_sync_state(network); // if there are no more finalized chains, re-status all known peers awaiting a head // sync @@ -329,7 +329,7 @@ impl RangeSync { // update the state of the collection self.chains.update(network); // update the global state and log any change - self.chains.update_sync_state(); + self.chains.update_sync_state(network); } Some((_, ProcessingResult::KeepChain)) => {} None => { @@ -358,7 +358,7 @@ impl RangeSync { // update the state of the collection self.chains.update(network); // update the global state and inform the user - self.chains.update_sync_state(); + self.chains.update_sync_state(network); } /// When a peer gets removed, both the head and finalized chains need to be searched to check which pool the peer is in. The chain may also have a batch or batches awaiting