diff --git a/beacon_node/lighthouse_network/src/peer_manager/mod.rs b/beacon_node/lighthouse_network/src/peer_manager/mod.rs index 85c0ddd950..9c8d41194c 100644 --- a/beacon_node/lighthouse_network/src/peer_manager/mod.rs +++ b/beacon_node/lighthouse_network/src/peer_manager/mod.rs @@ -1015,20 +1015,17 @@ impl PeerManager { let mut removed_peer_index = None; for (index, (candidate_peer, info)) in peers_on_subnet.iter().enumerate() { // Ensure we don't remove too many outbound peers - if info.is_outbound_only() { - if self.target_outbound_peers() - < connected_outbound_peer_count + if info.is_outbound_only() + && self.target_outbound_peers() + >= connected_outbound_peer_count .saturating_sub(outbound_peers_pruned) - { - outbound_peers_pruned += 1; - } else { - // Restart the main loop with the outbound peer removed from - // the list. This will lower the peers per subnet count and - // potentially a new subnet may be chosen to remove peers. This - // can occur recursively until we have no peers left to choose - // from. - continue; - } + { + // Restart the main loop with the outbound peer removed from + // the list. This will lower the peers per subnet count and + // potentially a new subnet may be chosen to remove peers. This + // can occur recursively until we have no peers left to choose + // from. + continue; } // Check the sync committee @@ -1051,6 +1048,9 @@ impl PeerManager { } } + if info.is_outbound_only() { + outbound_peers_pruned += 1; + } // This peer is suitable to be pruned removed_peer_index = Some(index); break; @@ -1885,4 +1885,170 @@ mod tests { assert!(!connected_peers.contains(&peers[1])); assert!(!connected_peers.contains(&peers[2])); } + + /// This test is for reproducing the issue: + /// https://github.com/sigp/lighthouse/pull/3236#issue-1256432659 + /// + /// Whether the issue happens depends on `subnet_to_peer` (HashMap), since HashMap doesn't + /// guarantee a particular order of iteration. So we repeat the test case to try to reproduce + /// the issue. + #[tokio::test] + async fn test_peer_manager_prune_based_on_subnet_count_repeat() { + for _ in 0..100 { + test_peer_manager_prune_based_on_subnet_count().await; + } + } + + /// Test the pruning logic to prioritize peers with the most subnets. This test specifies + /// the connection direction for the peers. + /// Either Peer 4 or 5 is expected to be removed in this test case. + /// + /// Create 8 peers. + /// Peer0 (out) : Subnet 1, Sync-committee-1 + /// Peer1 (out) : Subnet 1, Sync-committee-1 + /// Peer2 (out) : Subnet 2, Sync-committee-2 + /// Peer3 (out) : Subnet 2, Sync-committee-2 + /// Peer4 (out) : Subnet 3 + /// Peer5 (out) : Subnet 3 + /// Peer6 (in) : Subnet 4 + /// Peer7 (in) : Subnet 5 + async fn test_peer_manager_prune_based_on_subnet_count() { + let target = 7; + let mut peer_manager = build_peer_manager(target).await; + + // Create 8 peers to connect to. + let mut peers = Vec::new(); + for x in 0..8 { + let peer = PeerId::random(); + + // Have some of the peers be on a long-lived subnet + let mut attnets = crate::types::EnrAttestationBitfield::::new(); + let mut syncnets = crate::types::EnrSyncCommitteeBitfield::::new(); + + match x { + 0 => { + peer_manager.inject_connect_outgoing( + &peer, + "/ip4/0.0.0.0".parse().unwrap(), + None, + ); + attnets.set(1, true).unwrap(); + syncnets.set(1, true).unwrap(); + } + 1 => { + peer_manager.inject_connect_outgoing( + &peer, + "/ip4/0.0.0.0".parse().unwrap(), + None, + ); + attnets.set(1, true).unwrap(); + syncnets.set(1, true).unwrap(); + } + 2 => { + peer_manager.inject_connect_outgoing( + &peer, + "/ip4/0.0.0.0".parse().unwrap(), + None, + ); + attnets.set(2, true).unwrap(); + syncnets.set(2, true).unwrap(); + } + 3 => { + peer_manager.inject_connect_outgoing( + &peer, + "/ip4/0.0.0.0".parse().unwrap(), + None, + ); + attnets.set(2, true).unwrap(); + syncnets.set(2, true).unwrap(); + } + 4 => { + peer_manager.inject_connect_outgoing( + &peer, + "/ip4/0.0.0.0".parse().unwrap(), + None, + ); + attnets.set(3, true).unwrap(); + } + 5 => { + peer_manager.inject_connect_outgoing( + &peer, + "/ip4/0.0.0.0".parse().unwrap(), + None, + ); + attnets.set(3, true).unwrap(); + } + 6 => { + peer_manager.inject_connect_ingoing( + &peer, + "/ip4/0.0.0.0".parse().unwrap(), + None, + ); + attnets.set(4, true).unwrap(); + } + 7 => { + peer_manager.inject_connect_ingoing( + &peer, + "/ip4/0.0.0.0".parse().unwrap(), + None, + ); + attnets.set(5, true).unwrap(); + } + _ => unreachable!(), + } + + let metadata = crate::rpc::MetaDataV2 { + seq_number: 0, + attnets, + syncnets, + }; + peer_manager + .network_globals + .peers + .write() + .peer_info_mut(&peer) + .unwrap() + .set_meta_data(MetaData::V2(metadata)); + let long_lived_subnets = peer_manager + .network_globals + .peers + .read() + .peer_info(&peer) + .unwrap() + .long_lived_subnets(); + println!("{},{}", x, peer); + for subnet in long_lived_subnets { + println!("Subnet: {:?}", subnet); + peer_manager + .network_globals + .peers + .write() + .add_subscription(&peer, subnet); + } + peers.push(peer); + } + + // Perform the heartbeat. + peer_manager.heartbeat(); + + // Tests that when we are over the target peer limit, after disconnecting an unhealthy peer, + // the number of connected peers updates and we will not remove too many peers. + assert_eq!( + peer_manager.network_globals.connected_or_dialing_peers(), + target + ); + + let connected_peers: std::collections::HashSet<_> = peer_manager + .network_globals + .peers + .read() + .connected_or_dialing_peers() + .cloned() + .collect(); + + // Either peer 4 or 5 should be removed. + // Check that we keep 6 and 7 peers, which we have few on a particular subnet. + assert!(connected_peers.contains(&peers[6])); + assert!(connected_peers.contains(&peers[7])); + } }