From 80e52a026370d238957a3307e214689c8859f0c0 Mon Sep 17 00:00:00 2001 From: Age Manning Date: Wed, 23 Sep 2020 03:26:33 +0000 Subject: [PATCH] Subscribe to core topics after sync (#1613) ## Issue Addressed N/A ## Proposed Changes Prevent subscribing to core gossipsub topics until after we have achieved a full sync. This prevents us censoring gossipsub channels, getting penalised in gossipsub 1.1 scoring and saves us computation time in attempting to validate gossipsub messages which we will be unable to do with a non-sync'd chain. --- beacon_node/eth2_libp2p/src/config.rs | 11 +---------- beacon_node/eth2_libp2p/src/service.rs | 4 +++- beacon_node/eth2_libp2p/src/types/mod.rs | 2 +- beacon_node/eth2_libp2p/src/types/topics.rs | 8 ++++++++ beacon_node/network/src/service.rs | 18 ++++++++++++++++++ beacon_node/network/src/sync/manager.rs | 16 ++++++++++++---- .../network/src/sync/network_context.rs | 12 ++++++++++-- .../src/sync/range_sync/chain_collection.rs | 11 +++++++---- .../network/src/sync/range_sync/range.rs | 16 ++++++++-------- 9 files changed, 68 insertions(+), 30 deletions(-) diff --git a/beacon_node/eth2_libp2p/src/config.rs b/beacon_node/eth2_libp2p/src/config.rs index b2ee8558cf..73094642d7 100644 --- a/beacon_node/eth2_libp2p/src/config.rs +++ b/beacon_node/eth2_libp2p/src/config.rs @@ -78,15 +78,6 @@ impl Default for Config { network_dir.push(".lighthouse"); network_dir.push("network"); - // The default topics that we will initially subscribe to - let topics = vec![ - GossipKind::BeaconBlock, - GossipKind::BeaconAggregateAndProof, - GossipKind::VoluntaryExit, - GossipKind::ProposerSlashing, - GossipKind::AttesterSlashing, - ]; - // The function used to generate a gossipsub message id // We use the first 8 bytes of SHA256(data) for content addressing let gossip_message_id = @@ -145,7 +136,7 @@ impl Default for Config { trusted_peers: vec![], client_version: lighthouse_version::version_with_platform(), disable_discovery: false, - topics, + topics: Vec::new(), } } } diff --git a/beacon_node/eth2_libp2p/src/service.rs b/beacon_node/eth2_libp2p/src/service.rs index 1d594918d5..52286c05d3 100644 --- a/beacon_node/eth2_libp2p/src/service.rs +++ b/beacon_node/eth2_libp2p/src/service.rs @@ -207,7 +207,9 @@ impl Service { warn!(log, "Could not subscribe to topic"; "topic" => format!("{}",topic_kind)); } } - info!(log, "Subscribed to topics"; "topics" => format!("{:?}", subscribed_topics)); + if !subscribed_topics.is_empty() { + info!(log, "Subscribed to topics"; "topics" => format!("{:?}", subscribed_topics)); + } let service = Service { local_peer_id, diff --git a/beacon_node/eth2_libp2p/src/types/mod.rs b/beacon_node/eth2_libp2p/src/types/mod.rs index ec6fcd4af4..762ea7d740 100644 --- a/beacon_node/eth2_libp2p/src/types/mod.rs +++ b/beacon_node/eth2_libp2p/src/types/mod.rs @@ -16,4 +16,4 @@ pub use globals::NetworkGlobals; pub use pubsub::PubsubMessage; pub use subnet::SubnetDiscovery; pub use sync_state::SyncState; -pub use topics::{GossipEncoding, GossipKind, GossipTopic}; +pub use topics::{GossipEncoding, GossipKind, GossipTopic, CORE_TOPICS}; diff --git a/beacon_node/eth2_libp2p/src/types/topics.rs b/beacon_node/eth2_libp2p/src/types/topics.rs index f564a54137..3f120b3ec1 100644 --- a/beacon_node/eth2_libp2p/src/types/topics.rs +++ b/beacon_node/eth2_libp2p/src/types/topics.rs @@ -14,6 +14,14 @@ pub const VOLUNTARY_EXIT_TOPIC: &str = "voluntary_exit"; pub const PROPOSER_SLASHING_TOPIC: &str = "proposer_slashing"; pub const ATTESTER_SLASHING_TOPIC: &str = "attester_slashing"; +pub const CORE_TOPICS: [GossipKind; 5] = [ + GossipKind::BeaconBlock, + GossipKind::BeaconAggregateAndProof, + GossipKind::VoluntaryExit, + GossipKind::ProposerSlashing, + GossipKind::AttesterSlashing, +]; + /// A gossipsub topic which encapsulates the type of messages that should be sent and received over /// the pubsub protocol and the way the messages should be encoded. #[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Hash)] diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index 84f807007f..a018750f3f 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -35,6 +35,9 @@ pub enum NetworkMessage { Subscribe { subscriptions: Vec, }, + /// Subscribes the beacon node to the core gossipsub topics. We do this when we are either + /// synced or close to the head slot. + SubscribeCoreTopics, /// Send an RPC request to the libp2p service. SendRequest { peer_id: PeerId, @@ -278,6 +281,21 @@ fn spawn_service( warn!(service.log, "Validator subscription failed"; "error" => e); } } + NetworkMessage::SubscribeCoreTopics => { + let mut subscribed_topics: Vec = vec![]; + let already_subscribed = service.network_globals.gossipsub_subscriptions.read().clone(); + let already_subscribed = already_subscribed.iter().map(|x| x.kind()).collect::>(); + for topic_kind in eth2_libp2p::types::CORE_TOPICS.iter().filter(|topic| already_subscribed.get(topic).is_none()) { + if service.libp2p.swarm.subscribe_kind(topic_kind.clone()) { + subscribed_topics.push(topic_kind.clone()); + } else { + warn!(service.log, "Could not subscribe to topic"; "topic" => format!("{}",topic_kind)); + } + } + if !subscribed_topics.is_empty() { + info!(service.log, "Subscribed to topics"; "topics" => format!("{:?}", subscribed_topics)); + } + } } } // process any attestation service events diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 3aa5577d70..f147944478 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -57,7 +57,11 @@ use types::{Epoch, EthSpec, Hash256, SignedBeaconBlock, Slot}; /// The number of slots ahead of us that is allowed before requesting a long-range (batch) Sync /// from a peer. If a peer is within this tolerance (forwards or backwards), it is treated as a /// fully sync'd peer. -pub const SLOT_IMPORT_TOLERANCE: usize = 20; +/// +/// This means that we consider ourselves synced (and hence subscribe to all subnets and block +/// gossip if no peers are further than this range ahead of us that we have not already downloaded +/// blocks for. +pub const SLOT_IMPORT_TOLERANCE: usize = 32; /// How many attempts we try to find a parent of a block before we give up trying . const PARENT_FAIL_TOLERANCE: usize = 5; /// The maximum depth we will search for a parent block. In principle we should have sync'd any @@ -137,7 +141,7 @@ struct ParentRequests { failed_attempts: usize, /// The peer who last submitted a block. If the chain ends or fails, this is the peer that is - /// downvoted. + /// penalized. last_submitted_peer: PeerId, /// The request ID of this lookup is in progress. @@ -277,7 +281,7 @@ impl SyncManager { ); self.synced_peer(&peer_id, remote); // notify the range sync that a peer has been added - self.range_sync.fully_synced_peer_found(); + self.range_sync.fully_synced_peer_found(&mut self.network); } PeerSyncType::Advanced => { trace!(self.log, "Useful peer for sync found"; @@ -303,7 +307,7 @@ impl SyncManager { { self.synced_peer(&peer_id, remote); // notify the range sync that a peer has been added - self.range_sync.fully_synced_peer_found(); + self.range_sync.fully_synced_peer_found(&mut self.network); } else { // Add the peer to our RangeSync self.range_sync @@ -675,6 +679,10 @@ impl SyncManager { fn update_sync_state(&mut self) { if let Some((old_state, new_state)) = self.network_globals.update_sync_state() { info!(self.log, "Sync state updated"; "old_state" => format!("{}", old_state), "new_state" => format!("{}",new_state)); + // If we have become synced - Subscribe to all the core subnet topics + if new_state == eth2_libp2p::types::SyncState::Synced { + self.network.subscribe_core_topics(); + } } } diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index caccce4e61..83f4cb64ed 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -113,8 +113,8 @@ impl SyncNetworkContext { debug!(self.log, "Sync reporting peer"; "peer_id" => peer_id.to_string(), "action" => action.to_string()); self.network_send .send(NetworkMessage::ReportPeer { peer_id, action }) - .unwrap_or_else(|_| { - warn!(self.log, "Could not report peer, channel failed"); + .unwrap_or_else(|e| { + warn!(self.log, "Could not report peer, channel failed"; "error"=> e.to_string()); }); } @@ -133,6 +133,14 @@ impl SyncNetworkContext { Ok(request_id) } + pub fn subscribe_core_topics(&mut self) { + self.network_send + .send(NetworkMessage::SubscribeCoreTopics) + .unwrap_or_else(|e| { + warn!(self.log, "Could not subscribe to core topics."; "error" => e.to_string()); + }); + } + fn send_network_msg(&mut self, msg: NetworkMessage) -> Result<(), &'static str> { self.network_send.send(msg).map_err(|_| { debug!(self.log, "Could not send message to the network service"); diff --git a/beacon_node/network/src/sync/range_sync/chain_collection.rs b/beacon_node/network/src/sync/range_sync/chain_collection.rs index 3ccbb351b7..71e0606703 100644 --- a/beacon_node/network/src/sync/range_sync/chain_collection.rs +++ b/beacon_node/network/src/sync/range_sync/chain_collection.rs @@ -113,7 +113,7 @@ impl ChainCollection { } /// Updates the global sync state and logs any changes. - pub fn update_sync_state(&mut self) { + pub fn update_sync_state(&mut self, network: &mut SyncNetworkContext) { // if there is no range sync occurring, the state is either synced or not based on // connected peers. @@ -130,8 +130,11 @@ impl ChainCollection { let mut peer_state = self.network_globals.sync_state.write(); if new_state != *peer_state { info!(self.log, "Sync state updated"; "old_state" => format!("{}",peer_state), "new_state" => format!("{}",new_state)); + if new_state == SyncState::Synced { + network.subscribe_core_topics(); + } + *peer_state = new_state; } - *peer_state = new_state; } else { // The state is based on a range sync state, update it let mut node_sync_state = self.network_globals.sync_state.write(); @@ -148,12 +151,12 @@ impl ChainCollection { /// /// We could be awaiting a head sync. If we are in the head syncing state, without any head /// chains, then update the state to idle. - pub fn fully_synced_peer_found(&mut self) { + pub fn fully_synced_peer_found(&mut self, network: &mut SyncNetworkContext) { if let RangeSyncState::Head { .. } = self.state { if self.head_chains.is_empty() { // Update the global network state to either synced or stalled. self.state = RangeSyncState::Idle; - self.update_sync_state(); + self.update_sync_state(network); } } } diff --git a/beacon_node/network/src/sync/range_sync/range.rs b/beacon_node/network/src/sync/range_sync/range.rs index 59dfec16de..66339be068 100644 --- a/beacon_node/network/src/sync/range_sync/range.rs +++ b/beacon_node/network/src/sync/range_sync/range.rs @@ -98,8 +98,8 @@ impl RangeSync { /// On re-status, a peer that has no head to download indicates that this state can be set to /// idle as there are in fact no head chains to download. This function notifies the chain /// collection that the state can safely be set to idle. - pub fn fully_synced_peer_found(&mut self) { - self.chains.fully_synced_peer_found() + pub fn fully_synced_peer_found(&mut self, network: &mut SyncNetworkContext) { + self.chains.fully_synced_peer_found(network) } /// A useful peer has been added. The SyncManager has identified this peer as needing either @@ -168,7 +168,7 @@ impl RangeSync { // check if the new peer's addition will favour a new syncing chain. self.chains.update(network); // update the global sync state if necessary - self.chains.update_sync_state(); + self.chains.update_sync_state(network); } else { // there is no finalized chain that matches this peer's last finalized target // create a new finalized chain @@ -183,7 +183,7 @@ impl RangeSync { ); self.chains.update(network); // update the global sync state - self.chains.update_sync_state(); + self.chains.update_sync_state(network); } } RangeSyncType::Head => { @@ -229,7 +229,7 @@ impl RangeSync { ); } self.chains.update(network); - self.chains.update_sync_state(); + self.chains.update_sync_state(network); } } } @@ -292,7 +292,7 @@ impl RangeSync { // head chain. self.chains.set_head_sync(); // Update the global variables - self.chains.update_sync_state(); + self.chains.update_sync_state(network); // if there are no more finalized chains, re-status all known peers awaiting a head // sync @@ -329,7 +329,7 @@ impl RangeSync { // update the state of the collection self.chains.update(network); // update the global state and log any change - self.chains.update_sync_state(); + self.chains.update_sync_state(network); } Some((_, ProcessingResult::KeepChain)) => {} None => { @@ -358,7 +358,7 @@ impl RangeSync { // update the state of the collection self.chains.update(network); // update the global state and inform the user - self.chains.update_sync_state(); + self.chains.update_sync_state(network); } /// When a peer gets removed, both the head and finalized chains need to be searched to check which pool the peer is in. The chain may also have a batch or batches awaiting