Move the peer manager to be a behaviour (#2773)

This simply moves some functions that were "swarm notifications" to a network behaviour implementation.

Notes
------
- We could disconnect from the peer manager but we would lose the rpc shutdown message
- We still notify from the swarm since this is the most reliable way to get some events. Ugly but best for now
- Events need to be pushed with "add event" to wake the waker

Co-authored-by: Divma <26765164+divagant-martian@users.noreply.github.com>
This commit is contained in:
Divma
2021-11-08 00:01:10 +00:00
parent df02639b71
commit fbafe416d1
6 changed files with 336 additions and 314 deletions

View File

@@ -2,23 +2,17 @@
use crate::discovery::TARGET_SUBNET_PEERS;
use crate::rpc::{GoodbyeReason, MetaData, Protocol, RPCError, RPCResponseErrorCode};
use crate::types::SyncState;
use crate::{error, metrics, Gossipsub};
use crate::{NetworkGlobals, PeerId};
use crate::{Subnet, SubnetDiscovery};
use discv5::Enr;
use futures::prelude::*;
use futures::Stream;
use hashset_delay::HashSetDelay;
use libp2p::core::ConnectedPoint;
use libp2p::identify::IdentifyInfo;
use peerdb::{BanOperation, BanResult, ScoreUpdateResult};
use slog::{debug, error, warn};
use smallvec::SmallVec;
use std::{
pin::Pin,
sync::Arc,
task::{Context, Poll},
time::{Duration, Instant},
};
use types::{EthSpec, SyncSubnetId};
@@ -36,6 +30,7 @@ pub use peerdb::sync_status::{SyncInfo, SyncStatus};
use std::collections::{hash_map::Entry, HashMap};
use std::net::IpAddr;
pub mod config;
mod network_behaviour;
/// The heartbeat performs regular updates such as updating reputations and performing discovery
/// requests. This defines the interval in seconds.
@@ -81,6 +76,7 @@ pub struct PeerManager<TSpec: EthSpec> {
}
/// The events that the `PeerManager` outputs (requests).
#[derive(Debug)]
pub enum PeerManagerEvent {
/// A peer has dialed us.
PeerConnectedIncoming(PeerId),
@@ -341,147 +337,6 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
self.inject_peer_connection(peer_id, ConnectingType::Dialing, enr);
}
pub fn inject_connection_established(
&mut self,
peer_id: PeerId,
endpoint: ConnectedPoint,
num_established: std::num::NonZeroU32,
enr: Option<Enr>,
) {
// Log the connection
match &endpoint {
ConnectedPoint::Listener { .. } => {
debug!(self.log, "Connection established"; "peer_id" => %peer_id, "connection" => "Incoming", "connections" => %num_established);
}
ConnectedPoint::Dialer { .. } => {
debug!(self.log, "Connection established"; "peer_id" => %peer_id, "connection" => "Outgoing", "connections" => %num_established);
}
}
// Check to make sure the peer is not supposed to be banned
match self.ban_status(&peer_id) {
BanResult::BadScore => {
// This is a faulty state
error!(self.log, "Connected to a banned peer, re-banning"; "peer_id" => %peer_id);
// Reban the peer
self.goodbye_peer(&peer_id, GoodbyeReason::Banned, ReportSource::PeerManager);
return;
}
BanResult::BannedIp(ip_addr) => {
// A good peer has connected to us via a banned IP address. We ban the peer and
// prevent future connections.
debug!(self.log, "Peer connected via banned IP. Banning"; "peer_id" => %peer_id, "banned_ip" => %ip_addr);
self.goodbye_peer(&peer_id, GoodbyeReason::BannedIP, ReportSource::PeerManager);
return;
}
BanResult::NotBanned => {}
}
// Check the connection limits
if self.peer_limit_reached()
&& self
.network_globals
.peers
.read()
.peer_info(&peer_id)
.map_or(true, |peer| !peer.has_future_duty())
{
// Gracefully disconnect the peer.
self.disconnect_peer(peer_id, GoodbyeReason::TooManyPeers);
return;
}
// Register the newly connected peer (regardless if we are about to disconnect them).
// NOTE: We don't register peers that we are disconnecting immediately. The network service
// does not need to know about these peers.
match endpoint {
ConnectedPoint::Listener { send_back_addr, .. } => {
self.inject_connect_ingoing(&peer_id, send_back_addr, enr);
if num_established == std::num::NonZeroU32::new(1).expect("valid") {
self.events
.push(PeerManagerEvent::PeerConnectedIncoming(peer_id));
}
}
ConnectedPoint::Dialer { address } => {
self.inject_connect_outgoing(&peer_id, address, enr);
if num_established == std::num::NonZeroU32::new(1).expect("valid") {
self.events
.push(PeerManagerEvent::PeerConnectedOutgoing(peer_id));
}
}
}
let connected_peers = self.network_globals.connected_peers() as i64;
// increment prometheus metrics
metrics::inc_counter(&metrics::PEER_CONNECT_EVENT_COUNT);
metrics::set_gauge(&metrics::PEERS_CONNECTED, connected_peers);
metrics::set_gauge(&metrics::PEERS_CONNECTED_INTEROP, connected_peers);
}
pub fn inject_connection_closed(
&mut self,
peer_id: PeerId,
_endpoint: ConnectedPoint,
num_established: u32,
) {
if num_established == 0 {
// There are no more connections
if self
.network_globals
.peers
.read()
.is_connected_or_disconnecting(&peer_id)
{
// We are disconnecting the peer or the peer has already been connected.
// Both these cases, the peer has been previously registered by the peer manager and
// potentially the application layer.
// Inform the application.
self.events
.push(PeerManagerEvent::PeerDisconnected(peer_id));
debug!(self.log, "Peer disconnected"; "peer_id" => %peer_id);
// Decrement the PEERS_PER_CLIENT metric
if let Some(kind) = self
.network_globals
.peers
.read()
.peer_info(&peer_id)
.map(|info| info.client().kind.clone())
{
if let Some(v) =
metrics::get_int_gauge(&metrics::PEERS_PER_CLIENT, &[&kind.to_string()])
{
v.dec()
};
}
}
// NOTE: It may be the case that a rejected node, due to too many peers is disconnected
// here and the peer manager has no knowledge of its connection. We insert it here for
// reference so that peer manager can track this peer.
self.inject_disconnect(&peer_id);
let connected_peers = self.network_globals.connected_peers() as i64;
// Update the prometheus metrics
metrics::inc_counter(&metrics::PEER_DISCONNECT_EVENT_COUNT);
metrics::set_gauge(&metrics::PEERS_CONNECTED, connected_peers);
metrics::set_gauge(&metrics::PEERS_CONNECTED_INTEROP, connected_peers);
}
}
/// A dial attempt has failed.
///
/// NOTE: It can be the case that we are dialing a peer and during the dialing process the peer
/// connects and the dial attempt later fails. To handle this, we only update the peer_db if
/// the peer is not already connected.
pub fn inject_dial_failure(&mut self, peer_id: &PeerId) {
if !self.network_globals.peers.read().is_connected(peer_id) {
self.inject_disconnect(peer_id);
}
}
/// Reports if a peer is banned or not.
///
/// This is used to determine if we should accept incoming connections.
@@ -973,70 +828,6 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
}
}
impl<TSpec: EthSpec> Stream for PeerManager<TSpec> {
type Item = PeerManagerEvent;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
// perform the heartbeat when necessary
while self.heartbeat.poll_tick(cx).is_ready() {
self.heartbeat();
}
// poll the timeouts for pings and status'
loop {
match self.inbound_ping_peers.poll_next_unpin(cx) {
Poll::Ready(Some(Ok(peer_id))) => {
self.inbound_ping_peers.insert(peer_id);
self.events.push(PeerManagerEvent::Ping(peer_id));
}
Poll::Ready(Some(Err(e))) => {
error!(self.log, "Failed to check for inbound peers to ping"; "error" => e.to_string())
}
Poll::Ready(None) | Poll::Pending => break,
}
}
loop {
match self.outbound_ping_peers.poll_next_unpin(cx) {
Poll::Ready(Some(Ok(peer_id))) => {
self.outbound_ping_peers.insert(peer_id);
self.events.push(PeerManagerEvent::Ping(peer_id));
}
Poll::Ready(Some(Err(e))) => {
error!(self.log, "Failed to check for outbound peers to ping"; "error" => e.to_string())
}
Poll::Ready(None) | Poll::Pending => break,
}
}
if !matches!(
self.network_globals.sync_state(),
SyncState::SyncingFinalized { .. } | SyncState::SyncingHead { .. }
) {
loop {
match self.status_peers.poll_next_unpin(cx) {
Poll::Ready(Some(Ok(peer_id))) => {
self.status_peers.insert(peer_id);
self.events.push(PeerManagerEvent::Status(peer_id))
}
Poll::Ready(Some(Err(e))) => {
error!(self.log, "Failed to check for peers to ping"; "error" => e.to_string())
}
Poll::Ready(None) | Poll::Pending => break,
}
}
}
if !self.events.is_empty() {
return Poll::Ready(Some(self.events.remove(0)));
} else {
self.events.shrink_to_fit();
}
Poll::Pending
}
}
enum ConnectingType {
/// We are in the process of dialing this peer.
Dialing,