diff --git a/beacon_node/eth2_libp2p/src/peer_manager/mod.rs b/beacon_node/eth2_libp2p/src/peer_manager/mod.rs index b6eeeaba5f..816ad4ea43 100644 --- a/beacon_node/eth2_libp2p/src/peer_manager/mod.rs +++ b/beacon_node/eth2_libp2p/src/peer_manager/mod.rs @@ -163,55 +163,30 @@ impl PeerManager { /// If the peer doesn't exist, log a warning and insert defaults. pub fn report_peer(&mut self, peer_id: &PeerId, action: PeerAction) { // Helper function to avoid any potential deadlocks. - let mut ban_peer = None; - let mut unban_peer = None; + let mut to_ban_peers = Vec::with_capacity(1); + let mut to_unban_peers = Vec::with_capacity(1); { let mut peer_db = self.network_globals.peers.write(); if let Some(info) = peer_db.peer_info_mut(peer_id) { let previous_state = info.score_state(); info.apply_peer_action_to_score(action); + Self::handle_score_transitions( + previous_state, + peer_id, + info, + &mut to_ban_peers, + &mut to_unban_peers, + &mut self.events, + &self.log, + ); if previous_state != info.score_state() { - match info.score_state() { - ScoreState::Banned => { - debug!(self.log, "Peer has been banned"; "peer_id" => peer_id.to_string(), "score" => info.score().to_string()); - ban_peer = Some(peer_id); - } - ScoreState::Disconnected => { - debug!(self.log, "Peer transitioned to disconnect state"; "peer_id" => peer_id.to_string(), "score" => info.score().to_string(), "past_state" => previous_state.to_string()); - // disconnect the peer if it's currently connected or dialing - if info.is_connected_or_dialing() { - self.events.push(PeerManagerEvent::DisconnectPeer( - peer_id.clone(), - GoodbyeReason::BadScore, - )); - peer_db.notify_disconnecting(peer_id); - } else if info.is_banned() { - unban_peer = Some(peer_id); - } - } - ScoreState::Healthy => { - debug!(self.log, "Peer transitioned to healthy state"; "peer_id" => peer_id.to_string(), "score" => info.score().to_string(), "past_state" => previous_state.to_string()); - // unban the peer if it was previously banned. - if info.is_banned() { - unban_peer = Some(peer_id); - } - } - } - } else { debug!(self.log, "Peer score adjusted"; "peer_id" => peer_id.to_string(), "score" => info.score().to_string()); } } } // end write lock - if let Some(peer_id) = ban_peer { - self.ban_peer(peer_id); - } - if let Some(peer_id) = unban_peer { - if let Err(e) = self.unban_peer(peer_id) { - error!(self.log, "{}", e; "peer_id" => %peer_id); - } - } + self.ban_and_unban_peers(to_ban_peers, to_unban_peers); } /* Discovery Requests */ @@ -528,31 +503,56 @@ impl PeerManager { } pub(crate) fn update_gossipsub_scores(&mut self, gossipsub: &Gossipsub) { - //collect peers with scores - let mut guard = self.network_globals.peers.write(); - let mut peers: Vec<_> = guard - .peers_mut() - .filter_map(|(peer_id, info)| gossipsub.peer_score(peer_id).map(|score| (info, score))) - .collect(); + let mut to_ban_peers = Vec::new(); + let mut to_unban_peers = Vec::new(); - // sort descending by score - peers.sort_unstable_by(|(.., s1), (.., s2)| s2.partial_cmp(s1).unwrap_or(Ordering::Equal)); + { + //collect peers with scores + let mut guard = self.network_globals.peers.write(); + let mut peers: Vec<_> = guard + .peers_mut() + .filter_map(|(peer_id, info)| { + gossipsub + .peer_score(peer_id) + .map(|score| (peer_id, info, score)) + }) + .collect(); - let mut to_ignore_negative_peers = - (self.target_peers as f32 * ALLOWED_NEGATIVE_GOSSIPSUB_FACTOR).ceil() as usize; - for (info, score) in peers { - info.update_gossipsub_score( - score, - if score < 0.0 && to_ignore_negative_peers > 0 { - to_ignore_negative_peers -= 1; - // We ignore the negative score for the best negative peers so that their - // gossipsub score can recover without getting disconnected. - true - } else { - false - }, - ); - } + // sort descending by score + peers.sort_unstable_by(|(.., s1), (.., s2)| { + s2.partial_cmp(s1).unwrap_or(Ordering::Equal) + }); + + let mut to_ignore_negative_peers = + (self.target_peers as f32 * ALLOWED_NEGATIVE_GOSSIPSUB_FACTOR).ceil() as usize; + + for (peer_id, info, score) in peers { + let previous_state = info.score_state(); + info.update_gossipsub_score( + score, + if score < 0.0 && to_ignore_negative_peers > 0 { + to_ignore_negative_peers -= 1; + // We ignore the negative score for the best negative peers so that their + // gossipsub score can recover without getting disconnected. + true + } else { + false + }, + ); + + Self::handle_score_transitions( + previous_state, + peer_id, + info, + &mut to_ban_peers, + &mut to_unban_peers, + &mut self.events, + &self.log, + ); + } + } // end write lock + + self.ban_and_unban_peers(to_ban_peers, to_unban_peers); } /* Internal functions */ @@ -693,6 +693,59 @@ impl PeerManager { true } + fn handle_score_transitions( + previous_state: ScoreState, + peer_id: &PeerId, + info: &mut PeerInfo, + to_ban_peers: &mut Vec, + to_unban_peers: &mut Vec, + events: &mut SmallVec<[PeerManagerEvent; 16]>, + log: &slog::Logger, + ) { + if previous_state != info.score_state() { + match info.score_state() { + ScoreState::Banned => { + debug!(log, "Peer has been banned"; "peer_id" => peer_id.to_string(), "score" => info.score().to_string()); + to_ban_peers.push(peer_id.clone()); + } + ScoreState::Disconnected => { + debug!(log, "Peer transitioned to disconnect state"; "peer_id" => peer_id.to_string(), "score" => info.score().to_string(), "past_state" => previous_state.to_string()); + // disconnect the peer if it's currently connected or dialing + if info.is_connected_or_dialing() { + // Change the state to inform that we are disconnecting the peer. + info.disconnecting(false); + events.push(PeerManagerEvent::DisconnectPeer( + peer_id.clone(), + GoodbyeReason::BadScore, + )); + } else if info.is_banned() { + to_unban_peers.push(peer_id.clone()); + } + } + ScoreState::Healthy => { + debug!(log, "Peer transitioned to healthy state"; "peer_id" => peer_id.to_string(), "score" => info.score().to_string(), "past_state" => previous_state.to_string()); + // unban the peer if it was previously banned. + if info.is_banned() { + to_unban_peers.push(peer_id.clone()); + } + } + } + } + } + + fn ban_and_unban_peers(&mut self, to_ban_peers: Vec, to_unban_peers: Vec) { + // process banning peers + for peer_id in to_ban_peers { + self.ban_peer(&peer_id); + } + // process unbanning peers + for peer_id in to_unban_peers { + if let Err(e) = self.unban_peer(&peer_id) { + error!(self.log, "{}", e; "peer_id" => %peer_id); + } + } + } + /// Updates the scores of known peers according to their connection /// status and the time that has passed. /// NOTE: This is experimental and will likely be adjusted @@ -707,47 +760,17 @@ impl PeerManager { // Update scores info.score_update(); - // handle score transitions - if previous_state != info.score_state() { - match info.score_state() { - ScoreState::Banned => { - debug!(self.log, "Peer has been banned"; "peer_id" => peer_id.to_string(), "score" => info.score().to_string()); - to_ban_peers.push(peer_id.clone()); - } - ScoreState::Disconnected => { - debug!(self.log, "Peer transitioned to disconnect state"; "peer_id" => peer_id.to_string(), "score" => info.score().to_string(), "past_state" => previous_state.to_string()); - // disconnect the peer if it's currently connected or dialing - if info.is_connected_or_dialing() { - // Change the state to inform that we are disconnecting the peer. - info.disconnecting(false); - self.events.push(PeerManagerEvent::DisconnectPeer( - peer_id.clone(), - GoodbyeReason::BadScore, - )); - } else if info.is_banned() { - to_unban_peers.push(peer_id.clone()); - } - } - ScoreState::Healthy => { - debug!(self.log, "Peer transitioned to healthy state"; "peer_id" => peer_id.to_string(), "score" => info.score().to_string(), "past_state" => previous_state.to_string()); - // unban the peer if it was previously banned. - if info.is_banned() { - to_unban_peers.push(peer_id.clone()); - } - } - } - } - } - // process banning peers - for peer_id in to_ban_peers { - self.ban_peer(&peer_id); - } - // process unbanning peers - for peer_id in to_unban_peers { - if let Err(e) = self.unban_peer(&peer_id) { - error!(self.log, "{}", e; "peer_id" => %peer_id); - } + Self::handle_score_transitions( + previous_state, + peer_id, + info, + &mut to_ban_peers, + &mut to_unban_peers, + &mut self.events, + &self.log, + ); } + self.ban_and_unban_peers(to_ban_peers, to_unban_peers); } /// Bans a peer. diff --git a/beacon_node/eth2_libp2p/src/peer_manager/score.rs b/beacon_node/eth2_libp2p/src/peer_manager/score.rs index 80017bdf8d..71608d8fda 100644 --- a/beacon_node/eth2_libp2p/src/peer_manager/score.rs +++ b/beacon_node/eth2_libp2p/src/peer_manager/score.rs @@ -78,7 +78,7 @@ impl std::fmt::Display for PeerAction { } /// The expected state of the peer given the peer's score. -#[derive(Debug, PartialEq)] +#[derive(Debug, PartialEq, Clone, Copy)] pub(crate) enum ScoreState { /// We are content with the peers performance. We permit connections and messages. Healthy, @@ -267,7 +267,6 @@ macro_rules! apply { } apply!(apply_peer_action, peer_action: PeerAction); -apply!(add, delta: f64); apply!(update); apply!(update_gossipsub_score, new_score: f64, ignore: bool); #[cfg(test)] @@ -336,25 +335,25 @@ mod tests { // 0 change does not change de reputation // let change = 0.0; - score.add(change); + score.test_add(change); assert_eq!(score.score(), DEFAULT_SCORE); // underflowing change is capped let mut score = Score::default(); let change = MIN_SCORE - 50.0; - score.add(change); + score.test_add(change); assert_eq!(score.score(), MIN_SCORE); // overflowing change is capped let mut score = Score::default(); let change = MAX_SCORE + 50.0; - score.add(change); + score.test_add(change); assert_eq!(score.score(), MAX_SCORE); // Score adjusts let mut score = Score::default(); let change = 1.32; - score.add(change); + score.test_add(change); assert_eq!(score.score(), DEFAULT_SCORE + change); } @@ -364,7 +363,7 @@ mod tests { let now = Instant::now(); let change = MIN_SCORE_BEFORE_BAN; - score.add(change); + score.test_add(change); assert_eq!(score.score(), MIN_SCORE_BEFORE_BAN); score.update_at(now + BANNED_BEFORE_DECAY); @@ -381,7 +380,7 @@ mod tests { assert!(!score.is_good_gossipsub_peer()); assert!(score.score() < 0.0); assert_eq!(score.state(), ScoreState::Healthy); - score.add(-1.0001); + score.test_add(-1.0001); assert_eq!(score.state(), ScoreState::Disconnected); }