mirror of
https://github.com/sigp/lighthouse.git
synced 2026-05-08 01:05:47 +00:00
Upgrade to libp2p v0.50.0 (#3764)
I've needed to do this work in order to do some episub testing. This version of libp2p has not yet been released, so this is left as a draft for when we wish to update. Co-authored-by: Diva M <divma@protonmail.com>
This commit is contained in:
@@ -22,12 +22,13 @@ use enr::{ATTESTATION_BITFIELD_ENR_KEY, ETH2_ENR_KEY, SYNC_COMMITTEE_BITFIELD_EN
|
||||
use futures::prelude::*;
|
||||
use futures::stream::FuturesUnordered;
|
||||
use libp2p::multiaddr::Protocol;
|
||||
use libp2p::swarm::behaviour::{DialFailure, FromSwarm};
|
||||
use libp2p::swarm::AddressScore;
|
||||
pub use libp2p::{
|
||||
core::{connection::ConnectionId, ConnectedPoint, Multiaddr, PeerId},
|
||||
swarm::{
|
||||
handler::ConnectionHandler, DialError, NetworkBehaviour,
|
||||
NetworkBehaviourAction as NBAction, NotifyHandler, PollParameters, SubstreamProtocol,
|
||||
dummy::ConnectionHandler, DialError, NetworkBehaviour, NetworkBehaviourAction as NBAction,
|
||||
NotifyHandler, PollParameters, SubstreamProtocol,
|
||||
},
|
||||
};
|
||||
use lru::LruCache;
|
||||
@@ -927,11 +928,11 @@ impl<TSpec: EthSpec> Discovery<TSpec> {
|
||||
|
||||
impl<TSpec: EthSpec> NetworkBehaviour for Discovery<TSpec> {
|
||||
// Discovery is not a real NetworkBehaviour...
|
||||
type ConnectionHandler = libp2p::swarm::handler::DummyConnectionHandler;
|
||||
type ConnectionHandler = ConnectionHandler;
|
||||
type OutEvent = DiscoveredPeers;
|
||||
|
||||
fn new_handler(&mut self) -> Self::ConnectionHandler {
|
||||
libp2p::swarm::handler::DummyConnectionHandler::default()
|
||||
ConnectionHandler
|
||||
}
|
||||
|
||||
// Handles the libp2p request to obtain multiaddrs for peer_id's in order to dial them.
|
||||
@@ -947,40 +948,6 @@ impl<TSpec: EthSpec> NetworkBehaviour for Discovery<TSpec> {
|
||||
}
|
||||
}
|
||||
|
||||
fn inject_event(
|
||||
&mut self,
|
||||
_: PeerId,
|
||||
_: ConnectionId,
|
||||
_: <Self::ConnectionHandler as ConnectionHandler>::OutEvent,
|
||||
) {
|
||||
}
|
||||
|
||||
fn inject_dial_failure(
|
||||
&mut self,
|
||||
peer_id: Option<PeerId>,
|
||||
_handler: Self::ConnectionHandler,
|
||||
error: &DialError,
|
||||
) {
|
||||
if let Some(peer_id) = peer_id {
|
||||
match error {
|
||||
DialError::Banned
|
||||
| DialError::LocalPeerId
|
||||
| DialError::InvalidPeerId(_)
|
||||
| DialError::ConnectionIo(_)
|
||||
| DialError::NoAddresses
|
||||
| DialError::Transport(_)
|
||||
| DialError::WrongPeerId { .. } => {
|
||||
// set peer as disconnected in discovery DHT
|
||||
debug!(self.log, "Marking peer disconnected in DHT"; "peer_id" => %peer_id);
|
||||
self.disconnect_peer(&peer_id);
|
||||
}
|
||||
DialError::ConnectionLimit(_)
|
||||
| DialError::DialPeerConditionFalse(_)
|
||||
| DialError::Aborted => {}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Main execution loop to drive the behaviour
|
||||
fn poll(
|
||||
&mut self,
|
||||
@@ -1067,6 +1034,50 @@ impl<TSpec: EthSpec> NetworkBehaviour for Discovery<TSpec> {
|
||||
}
|
||||
Poll::Pending
|
||||
}
|
||||
|
||||
fn on_swarm_event(&mut self, event: FromSwarm<Self::ConnectionHandler>) {
|
||||
match event {
|
||||
FromSwarm::DialFailure(DialFailure { peer_id, error, .. }) => {
|
||||
self.on_dial_failure(peer_id, error)
|
||||
}
|
||||
FromSwarm::ConnectionEstablished(_)
|
||||
| FromSwarm::ConnectionClosed(_)
|
||||
| FromSwarm::AddressChange(_)
|
||||
| FromSwarm::ListenFailure(_)
|
||||
| FromSwarm::NewListener(_)
|
||||
| FromSwarm::NewListenAddr(_)
|
||||
| FromSwarm::ExpiredListenAddr(_)
|
||||
| FromSwarm::ListenerError(_)
|
||||
| FromSwarm::ListenerClosed(_)
|
||||
| FromSwarm::NewExternalAddr(_)
|
||||
| FromSwarm::ExpiredExternalAddr(_) => {
|
||||
// Ignore events not relevant to discovery
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<TSpec: EthSpec> Discovery<TSpec> {
|
||||
fn on_dial_failure(&mut self, peer_id: Option<PeerId>, error: &DialError) {
|
||||
if let Some(peer_id) = peer_id {
|
||||
match error {
|
||||
DialError::Banned
|
||||
| DialError::LocalPeerId
|
||||
| DialError::InvalidPeerId(_)
|
||||
| DialError::ConnectionIo(_)
|
||||
| DialError::NoAddresses
|
||||
| DialError::Transport(_)
|
||||
| DialError::WrongPeerId { .. } => {
|
||||
// set peer as disconnected in discovery DHT
|
||||
debug!(self.log, "Marking peer disconnected in DHT"; "peer_id" => %peer_id);
|
||||
self.disconnect_peer(&peer_id);
|
||||
}
|
||||
DialError::ConnectionLimit(_)
|
||||
| DialError::DialPeerConditionFalse(_)
|
||||
| DialError::Aborted => {}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
@@ -7,7 +7,7 @@ use crate::{NetworkGlobals, PeerId};
|
||||
use crate::{Subnet, SubnetDiscovery};
|
||||
use delay_map::HashSetDelay;
|
||||
use discv5::Enr;
|
||||
use libp2p::identify::IdentifyInfo;
|
||||
use libp2p::identify::Info as IdentifyInfo;
|
||||
use peerdb::{client::ClientKind, BanOperation, BanResult, ScoreUpdateResult};
|
||||
use rand::seq::SliceRandom;
|
||||
use slog::{debug, error, trace, warn};
|
||||
|
||||
@@ -1,14 +1,12 @@
|
||||
use std::task::{Context, Poll};
|
||||
|
||||
use futures::StreamExt;
|
||||
use libp2p::core::connection::ConnectionId;
|
||||
use libp2p::core::ConnectedPoint;
|
||||
use libp2p::swarm::behaviour::{ConnectionClosed, ConnectionEstablished, DialFailure, FromSwarm};
|
||||
use libp2p::swarm::dial_opts::{DialOpts, PeerCondition};
|
||||
use libp2p::swarm::handler::DummyConnectionHandler;
|
||||
use libp2p::swarm::{
|
||||
ConnectionHandler, DialError, NetworkBehaviour, NetworkBehaviourAction, PollParameters,
|
||||
};
|
||||
use libp2p::{Multiaddr, PeerId};
|
||||
use libp2p::swarm::dummy::ConnectionHandler;
|
||||
use libp2p::swarm::{NetworkBehaviour, NetworkBehaviourAction, PollParameters};
|
||||
use libp2p::PeerId;
|
||||
use slog::{debug, error};
|
||||
use types::EthSpec;
|
||||
|
||||
@@ -20,23 +18,14 @@ use super::peerdb::BanResult;
|
||||
use super::{ConnectingType, PeerManager, PeerManagerEvent, ReportSource};
|
||||
|
||||
impl<TSpec: EthSpec> NetworkBehaviour for PeerManager<TSpec> {
|
||||
type ConnectionHandler = DummyConnectionHandler;
|
||||
type ConnectionHandler = ConnectionHandler;
|
||||
|
||||
type OutEvent = PeerManagerEvent;
|
||||
|
||||
/* Required trait members */
|
||||
|
||||
fn new_handler(&mut self) -> Self::ConnectionHandler {
|
||||
DummyConnectionHandler::default()
|
||||
}
|
||||
|
||||
fn inject_event(
|
||||
&mut self,
|
||||
_: PeerId,
|
||||
_: ConnectionId,
|
||||
_: <DummyConnectionHandler as ConnectionHandler>::OutEvent,
|
||||
) {
|
||||
unreachable!("Dummy handler does not emit events")
|
||||
ConnectionHandler
|
||||
}
|
||||
|
||||
fn poll(
|
||||
@@ -114,19 +103,46 @@ impl<TSpec: EthSpec> NetworkBehaviour for PeerManager<TSpec> {
|
||||
Poll::Pending
|
||||
}
|
||||
|
||||
/* Overwritten trait members */
|
||||
fn on_swarm_event(&mut self, event: FromSwarm<Self::ConnectionHandler>) {
|
||||
match event {
|
||||
FromSwarm::ConnectionEstablished(ConnectionEstablished {
|
||||
peer_id,
|
||||
endpoint,
|
||||
other_established,
|
||||
..
|
||||
}) => self.on_connection_established(peer_id, endpoint, other_established),
|
||||
FromSwarm::ConnectionClosed(ConnectionClosed {
|
||||
peer_id,
|
||||
remaining_established,
|
||||
..
|
||||
}) => self.on_connection_closed(peer_id, remaining_established),
|
||||
FromSwarm::DialFailure(DialFailure { peer_id, .. }) => self.on_dial_failure(peer_id),
|
||||
FromSwarm::AddressChange(_)
|
||||
| FromSwarm::ListenFailure(_)
|
||||
| FromSwarm::NewListener(_)
|
||||
| FromSwarm::NewListenAddr(_)
|
||||
| FromSwarm::ExpiredListenAddr(_)
|
||||
| FromSwarm::ListenerError(_)
|
||||
| FromSwarm::ListenerClosed(_)
|
||||
| FromSwarm::NewExternalAddr(_)
|
||||
| FromSwarm::ExpiredExternalAddr(_) => {
|
||||
// The rest of the events we ignore since they are handled in their associated
|
||||
// `SwarmEvent`
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn inject_connection_established(
|
||||
impl<TSpec: EthSpec> PeerManager<TSpec> {
|
||||
fn on_connection_established(
|
||||
&mut self,
|
||||
peer_id: &PeerId,
|
||||
_connection_id: &ConnectionId,
|
||||
peer_id: PeerId,
|
||||
endpoint: &ConnectedPoint,
|
||||
_failed_addresses: Option<&Vec<Multiaddr>>,
|
||||
other_established: usize,
|
||||
) {
|
||||
debug!(self.log, "Connection established"; "peer_id" => %peer_id, "connection" => ?endpoint.to_endpoint());
|
||||
if other_established == 0 {
|
||||
self.events.push(PeerManagerEvent::MetaData(*peer_id));
|
||||
self.events.push(PeerManagerEvent::MetaData(peer_id));
|
||||
}
|
||||
|
||||
// Check NAT if metrics are enabled
|
||||
@@ -135,20 +151,20 @@ impl<TSpec: EthSpec> NetworkBehaviour for PeerManager<TSpec> {
|
||||
}
|
||||
|
||||
// Check to make sure the peer is not supposed to be banned
|
||||
match self.ban_status(peer_id) {
|
||||
match self.ban_status(&peer_id) {
|
||||
// TODO: directly emit the ban event?
|
||||
BanResult::BadScore => {
|
||||
// This is a faulty state
|
||||
error!(self.log, "Connected to a banned peer. Re-banning"; "peer_id" => %peer_id);
|
||||
// Reban the peer
|
||||
self.goodbye_peer(peer_id, GoodbyeReason::Banned, ReportSource::PeerManager);
|
||||
self.goodbye_peer(&peer_id, GoodbyeReason::Banned, ReportSource::PeerManager);
|
||||
return;
|
||||
}
|
||||
BanResult::BannedIp(ip_addr) => {
|
||||
// A good peer has connected to us via a banned IP address. We ban the peer and
|
||||
// prevent future connections.
|
||||
debug!(self.log, "Peer connected via banned IP. Banning"; "peer_id" => %peer_id, "banned_ip" => %ip_addr);
|
||||
self.goodbye_peer(peer_id, GoodbyeReason::BannedIP, ReportSource::PeerManager);
|
||||
self.goodbye_peer(&peer_id, GoodbyeReason::BannedIP, ReportSource::PeerManager);
|
||||
return;
|
||||
}
|
||||
BanResult::NotBanned => {}
|
||||
@@ -162,11 +178,11 @@ impl<TSpec: EthSpec> NetworkBehaviour for PeerManager<TSpec> {
|
||||
.network_globals
|
||||
.peers
|
||||
.read()
|
||||
.peer_info(peer_id)
|
||||
.peer_info(&peer_id)
|
||||
.map_or(true, |peer| !peer.has_future_duty())
|
||||
{
|
||||
// Gracefully disconnect the peer.
|
||||
self.disconnect_peer(*peer_id, GoodbyeReason::TooManyPeers);
|
||||
self.disconnect_peer(peer_id, GoodbyeReason::TooManyPeers);
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -174,14 +190,14 @@ impl<TSpec: EthSpec> NetworkBehaviour for PeerManager<TSpec> {
|
||||
// does not need to know about these peers.
|
||||
match endpoint {
|
||||
ConnectedPoint::Listener { send_back_addr, .. } => {
|
||||
self.inject_connect_ingoing(peer_id, send_back_addr.clone(), None);
|
||||
self.inject_connect_ingoing(&peer_id, send_back_addr.clone(), None);
|
||||
self.events
|
||||
.push(PeerManagerEvent::PeerConnectedIncoming(*peer_id));
|
||||
.push(PeerManagerEvent::PeerConnectedIncoming(peer_id));
|
||||
}
|
||||
ConnectedPoint::Dialer { address, .. } => {
|
||||
self.inject_connect_outgoing(peer_id, address.clone(), None);
|
||||
self.inject_connect_outgoing(&peer_id, address.clone(), None);
|
||||
self.events
|
||||
.push(PeerManagerEvent::PeerConnectedOutgoing(*peer_id));
|
||||
.push(PeerManagerEvent::PeerConnectedOutgoing(peer_id));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -189,14 +205,8 @@ impl<TSpec: EthSpec> NetworkBehaviour for PeerManager<TSpec> {
|
||||
self.update_connected_peer_metrics();
|
||||
metrics::inc_counter(&metrics::PEER_CONNECT_EVENT_COUNT);
|
||||
}
|
||||
fn inject_connection_closed(
|
||||
&mut self,
|
||||
peer_id: &PeerId,
|
||||
_: &ConnectionId,
|
||||
_: &ConnectedPoint,
|
||||
_: DummyConnectionHandler,
|
||||
remaining_established: usize,
|
||||
) {
|
||||
|
||||
fn on_connection_closed(&mut self, peer_id: PeerId, remaining_established: usize) {
|
||||
if remaining_established > 0 {
|
||||
return;
|
||||
}
|
||||
@@ -206,62 +216,33 @@ impl<TSpec: EthSpec> NetworkBehaviour for PeerManager<TSpec> {
|
||||
.network_globals
|
||||
.peers
|
||||
.read()
|
||||
.is_connected_or_disconnecting(peer_id)
|
||||
.is_connected_or_disconnecting(&peer_id)
|
||||
{
|
||||
// We are disconnecting the peer or the peer has already been connected.
|
||||
// Both these cases, the peer has been previously registered by the peer manager and
|
||||
// potentially the application layer.
|
||||
// Inform the application.
|
||||
self.events
|
||||
.push(PeerManagerEvent::PeerDisconnected(*peer_id));
|
||||
.push(PeerManagerEvent::PeerDisconnected(peer_id));
|
||||
debug!(self.log, "Peer disconnected"; "peer_id" => %peer_id);
|
||||
}
|
||||
|
||||
// NOTE: It may be the case that a rejected node, due to too many peers is disconnected
|
||||
// here and the peer manager has no knowledge of its connection. We insert it here for
|
||||
// reference so that peer manager can track this peer.
|
||||
self.inject_disconnect(peer_id);
|
||||
self.inject_disconnect(&peer_id);
|
||||
|
||||
// Update the prometheus metrics
|
||||
self.update_connected_peer_metrics();
|
||||
metrics::inc_counter(&metrics::PEER_DISCONNECT_EVENT_COUNT);
|
||||
}
|
||||
|
||||
fn inject_address_change(
|
||||
&mut self,
|
||||
_peer_id: &PeerId,
|
||||
_connection_id: &ConnectionId,
|
||||
old: &ConnectedPoint,
|
||||
new: &ConnectedPoint,
|
||||
) {
|
||||
debug_assert!(
|
||||
matches!(
|
||||
(old, new),
|
||||
(
|
||||
// inbound remains inbound
|
||||
ConnectedPoint::Listener { .. },
|
||||
ConnectedPoint::Listener { .. }
|
||||
) | (
|
||||
// outbound remains outbound
|
||||
ConnectedPoint::Dialer { .. },
|
||||
ConnectedPoint::Dialer { .. }
|
||||
)
|
||||
),
|
||||
"A peer has changed between inbound and outbound"
|
||||
)
|
||||
}
|
||||
|
||||
/// A dial attempt has failed.
|
||||
///
|
||||
/// NOTE: It can be the case that we are dialing a peer and during the dialing process the peer
|
||||
/// connects and the dial attempt later fails. To handle this, we only update the peer_db if
|
||||
/// the peer is not already connected.
|
||||
fn inject_dial_failure(
|
||||
&mut self,
|
||||
peer_id: Option<PeerId>,
|
||||
_handler: DummyConnectionHandler,
|
||||
_error: &DialError,
|
||||
) {
|
||||
fn on_dial_failure(&mut self, peer_id: Option<PeerId>) {
|
||||
if let Some(peer_id) = peer_id {
|
||||
if !self.network_globals.peers.read().is_connected(&peer_id) {
|
||||
self.inject_disconnect(&peer_id);
|
||||
|
||||
@@ -2,7 +2,7 @@
|
||||
//!
|
||||
//! Currently using identify to fingerprint.
|
||||
|
||||
use libp2p::identify::IdentifyInfo;
|
||||
use libp2p::identify::Info as IdentifyInfo;
|
||||
use serde::Serialize;
|
||||
use strum::{AsRefStr, EnumIter, IntoStaticStr};
|
||||
|
||||
|
||||
@@ -7,8 +7,8 @@ use libp2p::gossipsub::subscription_filter::{
|
||||
MaxCountSubscriptionFilter, WhitelistSubscriptionFilter,
|
||||
};
|
||||
use libp2p::gossipsub::Gossipsub as BaseGossipsub;
|
||||
use libp2p::identify::Identify;
|
||||
use libp2p::NetworkBehaviour;
|
||||
use libp2p::identify::Behaviour as Identify;
|
||||
use libp2p::swarm::NetworkBehaviour;
|
||||
use types::EthSpec;
|
||||
|
||||
use super::api_types::RequestId;
|
||||
|
||||
@@ -26,7 +26,7 @@ use libp2p::gossipsub::subscription_filter::MaxCountSubscriptionFilter;
|
||||
use libp2p::gossipsub::{
|
||||
GossipsubEvent, IdentTopic as Topic, MessageAcceptance, MessageAuthenticity, MessageId,
|
||||
};
|
||||
use libp2p::identify::{Identify, IdentifyConfig, IdentifyEvent};
|
||||
use libp2p::identify::{Behaviour as Identify, Config as IdentifyConfig, Event as IdentifyEvent};
|
||||
use libp2p::multiaddr::{Multiaddr, Protocol as MProtocol};
|
||||
use libp2p::swarm::{ConnectionLimits, Swarm, SwarmBuilder, SwarmEvent};
|
||||
use libp2p::PeerId;
|
||||
@@ -316,7 +316,7 @@ impl<AppReqId: ReqId, TSpec: EthSpec> Network<AppReqId, TSpec> {
|
||||
|
||||
// use the executor for libp2p
|
||||
struct Executor(task_executor::TaskExecutor);
|
||||
impl libp2p::core::Executor for Executor {
|
||||
impl libp2p::swarm::Executor for Executor {
|
||||
fn exec(&self, f: Pin<Box<dyn futures::Future<Output = ()> + Send>>) {
|
||||
self.0.spawn(f, "libp2p");
|
||||
}
|
||||
@@ -341,12 +341,16 @@ impl<AppReqId: ReqId, TSpec: EthSpec> Network<AppReqId, TSpec> {
|
||||
.with_max_established_per_peer(Some(MAX_CONNECTIONS_PER_PEER));
|
||||
|
||||
(
|
||||
SwarmBuilder::new(transport, behaviour, local_peer_id)
|
||||
.notify_handler_buffer_size(std::num::NonZeroUsize::new(7).expect("Not zero"))
|
||||
.connection_event_buffer_size(64)
|
||||
.connection_limits(limits)
|
||||
.executor(Box::new(Executor(executor)))
|
||||
.build(),
|
||||
SwarmBuilder::with_executor(
|
||||
transport,
|
||||
behaviour,
|
||||
local_peer_id,
|
||||
Executor(executor),
|
||||
)
|
||||
.notify_handler_buffer_size(std::num::NonZeroUsize::new(7).expect("Not zero"))
|
||||
.connection_event_buffer_size(64)
|
||||
.connection_limits(limits)
|
||||
.build(),
|
||||
bandwidth,
|
||||
)
|
||||
};
|
||||
|
||||
@@ -44,8 +44,7 @@ type BoxedTransport = Boxed<(PeerId, StreamMuxerBox)>;
|
||||
pub fn build_transport(
|
||||
local_private_key: Keypair,
|
||||
) -> std::io::Result<(BoxedTransport, Arc<BandwidthSinks>)> {
|
||||
let tcp =
|
||||
libp2p::tcp::TokioTcpTransport::new(libp2p::tcp::GenTcpConfig::default().nodelay(true));
|
||||
let tcp = libp2p::tcp::tokio::Transport::new(libp2p::tcp::Config::default().nodelay(true));
|
||||
let transport = libp2p::dns::TokioDnsConfig::system(tcp)?;
|
||||
#[cfg(feature = "libp2p-websocket")]
|
||||
let transport = {
|
||||
|
||||
Reference in New Issue
Block a user