From a6d2ed6119c45b83c5fea79b67c3c48f3c81c156 Mon Sep 17 00:00:00 2001 From: Akihito Nakano Date: Mon, 6 Jun 2022 05:51:10 +0000 Subject: [PATCH] Fix: PeerManager doesn't remove "outbound only" peers which should be pruned (#3236) ## Issue Addressed This is one step to address https://github.com/sigp/lighthouse/issues/3092 before introducing `quickcheck`. I noticed an issue while I was reading the pruning implementation `PeerManager::prune_excess_peers()`. If a peer with the following condition, **`outbound_peers_pruned` counter increases but the peer is not pushed to `peers_to_prune`**. - [outbound only](https://github.com/sigp/lighthouse/blob/1e4ac8a4b9dec645af23af811475b4e4c95c69ee/beacon_node/lighthouse_network/src/peer_manager/mod.rs#L1018) - [min_subnet_count <= MIN_SYNC_COMMITTEE_PEERS](https://github.com/sigp/lighthouse/blob/1e4ac8a4b9dec645af23af811475b4e4c95c69ee/beacon_node/lighthouse_network/src/peer_manager/mod.rs#L1047) As a result, PeerManager doesn't remove "outbound" peers which should be pruned. Note: [`subnet_to_peer`](https://github.com/sigp/lighthouse/blob/e0d673ea86ac0f6dab3ddd92b0de06ce5eacf8c0/beacon_node/lighthouse_network/src/peer_manager/mod.rs#L999) (HashMap) doesn't guarantee a particular order of iteration. So whether the test fails depend on the order of iteration. --- .../src/peer_manager/mod.rs | 192 ++++++++++++++++-- 1 file changed, 179 insertions(+), 13 deletions(-) diff --git a/beacon_node/lighthouse_network/src/peer_manager/mod.rs b/beacon_node/lighthouse_network/src/peer_manager/mod.rs index 85c0ddd950..9c8d41194c 100644 --- a/beacon_node/lighthouse_network/src/peer_manager/mod.rs +++ b/beacon_node/lighthouse_network/src/peer_manager/mod.rs @@ -1015,20 +1015,17 @@ impl PeerManager { let mut removed_peer_index = None; for (index, (candidate_peer, info)) in peers_on_subnet.iter().enumerate() { // Ensure we don't remove too many outbound peers - if info.is_outbound_only() { - if self.target_outbound_peers() - < connected_outbound_peer_count + if info.is_outbound_only() + && self.target_outbound_peers() + >= connected_outbound_peer_count .saturating_sub(outbound_peers_pruned) - { - outbound_peers_pruned += 1; - } else { - // Restart the main loop with the outbound peer removed from - // the list. This will lower the peers per subnet count and - // potentially a new subnet may be chosen to remove peers. This - // can occur recursively until we have no peers left to choose - // from. - continue; - } + { + // Restart the main loop with the outbound peer removed from + // the list. This will lower the peers per subnet count and + // potentially a new subnet may be chosen to remove peers. This + // can occur recursively until we have no peers left to choose + // from. + continue; } // Check the sync committee @@ -1051,6 +1048,9 @@ impl PeerManager { } } + if info.is_outbound_only() { + outbound_peers_pruned += 1; + } // This peer is suitable to be pruned removed_peer_index = Some(index); break; @@ -1885,4 +1885,170 @@ mod tests { assert!(!connected_peers.contains(&peers[1])); assert!(!connected_peers.contains(&peers[2])); } + + /// This test is for reproducing the issue: + /// https://github.com/sigp/lighthouse/pull/3236#issue-1256432659 + /// + /// Whether the issue happens depends on `subnet_to_peer` (HashMap), since HashMap doesn't + /// guarantee a particular order of iteration. So we repeat the test case to try to reproduce + /// the issue. + #[tokio::test] + async fn test_peer_manager_prune_based_on_subnet_count_repeat() { + for _ in 0..100 { + test_peer_manager_prune_based_on_subnet_count().await; + } + } + + /// Test the pruning logic to prioritize peers with the most subnets. This test specifies + /// the connection direction for the peers. + /// Either Peer 4 or 5 is expected to be removed in this test case. + /// + /// Create 8 peers. + /// Peer0 (out) : Subnet 1, Sync-committee-1 + /// Peer1 (out) : Subnet 1, Sync-committee-1 + /// Peer2 (out) : Subnet 2, Sync-committee-2 + /// Peer3 (out) : Subnet 2, Sync-committee-2 + /// Peer4 (out) : Subnet 3 + /// Peer5 (out) : Subnet 3 + /// Peer6 (in) : Subnet 4 + /// Peer7 (in) : Subnet 5 + async fn test_peer_manager_prune_based_on_subnet_count() { + let target = 7; + let mut peer_manager = build_peer_manager(target).await; + + // Create 8 peers to connect to. + let mut peers = Vec::new(); + for x in 0..8 { + let peer = PeerId::random(); + + // Have some of the peers be on a long-lived subnet + let mut attnets = crate::types::EnrAttestationBitfield::::new(); + let mut syncnets = crate::types::EnrSyncCommitteeBitfield::::new(); + + match x { + 0 => { + peer_manager.inject_connect_outgoing( + &peer, + "/ip4/0.0.0.0".parse().unwrap(), + None, + ); + attnets.set(1, true).unwrap(); + syncnets.set(1, true).unwrap(); + } + 1 => { + peer_manager.inject_connect_outgoing( + &peer, + "/ip4/0.0.0.0".parse().unwrap(), + None, + ); + attnets.set(1, true).unwrap(); + syncnets.set(1, true).unwrap(); + } + 2 => { + peer_manager.inject_connect_outgoing( + &peer, + "/ip4/0.0.0.0".parse().unwrap(), + None, + ); + attnets.set(2, true).unwrap(); + syncnets.set(2, true).unwrap(); + } + 3 => { + peer_manager.inject_connect_outgoing( + &peer, + "/ip4/0.0.0.0".parse().unwrap(), + None, + ); + attnets.set(2, true).unwrap(); + syncnets.set(2, true).unwrap(); + } + 4 => { + peer_manager.inject_connect_outgoing( + &peer, + "/ip4/0.0.0.0".parse().unwrap(), + None, + ); + attnets.set(3, true).unwrap(); + } + 5 => { + peer_manager.inject_connect_outgoing( + &peer, + "/ip4/0.0.0.0".parse().unwrap(), + None, + ); + attnets.set(3, true).unwrap(); + } + 6 => { + peer_manager.inject_connect_ingoing( + &peer, + "/ip4/0.0.0.0".parse().unwrap(), + None, + ); + attnets.set(4, true).unwrap(); + } + 7 => { + peer_manager.inject_connect_ingoing( + &peer, + "/ip4/0.0.0.0".parse().unwrap(), + None, + ); + attnets.set(5, true).unwrap(); + } + _ => unreachable!(), + } + + let metadata = crate::rpc::MetaDataV2 { + seq_number: 0, + attnets, + syncnets, + }; + peer_manager + .network_globals + .peers + .write() + .peer_info_mut(&peer) + .unwrap() + .set_meta_data(MetaData::V2(metadata)); + let long_lived_subnets = peer_manager + .network_globals + .peers + .read() + .peer_info(&peer) + .unwrap() + .long_lived_subnets(); + println!("{},{}", x, peer); + for subnet in long_lived_subnets { + println!("Subnet: {:?}", subnet); + peer_manager + .network_globals + .peers + .write() + .add_subscription(&peer, subnet); + } + peers.push(peer); + } + + // Perform the heartbeat. + peer_manager.heartbeat(); + + // Tests that when we are over the target peer limit, after disconnecting an unhealthy peer, + // the number of connected peers updates and we will not remove too many peers. + assert_eq!( + peer_manager.network_globals.connected_or_dialing_peers(), + target + ); + + let connected_peers: std::collections::HashSet<_> = peer_manager + .network_globals + .peers + .read() + .connected_or_dialing_peers() + .cloned() + .collect(); + + // Either peer 4 or 5 should be removed. + // Check that we keep 6 and 7 peers, which we have few on a particular subnet. + assert!(connected_peers.contains(&peers[6])); + assert!(connected_peers.contains(&peers[7])); + } }