mirror of
https://github.com/sigp/lighthouse.git
synced 2026-07-03 04:44:28 +00:00
Remove duplicated connection limits checks (#6156)
* move main Behaviour to mod.rs for better readibility and remove connection limits checks after connection has been established, as those checks have already been done by connection limits Behaviour. * improve logging wording wrt dial logic when we call dial_peer we are not yet dialing but just adding the peer to the dial queue * do not use a constant for MAX_CONNECTIONS_PER_PEER we only use it at one place, and the function call is explicit. * address review and re-instate connection limits checks, but do it before the connection has been established. * Merge branch 'unstable' of github.com:sigp/lighthouse into remove-dial-error-denied * Merge branch 'unstable' of github.com:sigp/lighthouse into remove-dial-error-denied
This commit is contained in:
@@ -338,15 +338,15 @@ impl<E: EthSpec> PeerManager<E> {
|
|||||||
{
|
{
|
||||||
// This should be updated with the peer dialing. In fact created once the peer is
|
// This should be updated with the peer dialing. In fact created once the peer is
|
||||||
// dialed
|
// dialed
|
||||||
|
let peer_id = enr.peer_id();
|
||||||
if let Some(min_ttl) = min_ttl {
|
if let Some(min_ttl) = min_ttl {
|
||||||
self.network_globals
|
self.network_globals
|
||||||
.peers
|
.peers
|
||||||
.write()
|
.write()
|
||||||
.update_min_ttl(&enr.peer_id(), min_ttl);
|
.update_min_ttl(&peer_id, min_ttl);
|
||||||
}
|
}
|
||||||
let peer_id = enr.peer_id();
|
|
||||||
if self.dial_peer(enr) {
|
if self.dial_peer(enr) {
|
||||||
debug!(self.log, "Dialing discovered peer"; "peer_id" => %peer_id);
|
debug!(self.log, "Added discovered ENR peer to dial queue"; "peer_id" => %peer_id);
|
||||||
to_dial_peers += 1;
|
to_dial_peers += 1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -447,18 +447,6 @@ impl<E: EthSpec> PeerManager<E> {
|
|||||||
self.network_globals.peers.read().is_connected(peer_id)
|
self.network_globals.peers.read().is_connected(peer_id)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Reports whether the peer limit is reached in which case we stop allowing new incoming
|
|
||||||
/// connections.
|
|
||||||
pub fn peer_limit_reached(&self, count_dialing: bool) -> bool {
|
|
||||||
if count_dialing {
|
|
||||||
// This is an incoming connection so limit by the standard max peers
|
|
||||||
self.network_globals.connected_or_dialing_peers() >= self.max_peers()
|
|
||||||
} else {
|
|
||||||
// We dialed this peer, allow up to max_outbound_dialing_peers
|
|
||||||
self.network_globals.connected_peers() >= self.max_outbound_dialing_peers()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Updates `PeerInfo` with `identify` information.
|
/// Updates `PeerInfo` with `identify` information.
|
||||||
pub fn identify(&mut self, peer_id: &PeerId, info: &IdentifyInfo) {
|
pub fn identify(&mut self, peer_id: &PeerId, info: &IdentifyInfo) {
|
||||||
if let Some(peer_info) = self.network_globals.peers.write().peer_info_mut(peer_id) {
|
if let Some(peer_info) = self.network_globals.peers.write().peer_info_mut(peer_id) {
|
||||||
|
|||||||
@@ -15,7 +15,6 @@ use slog::{debug, error, trace};
|
|||||||
use types::EthSpec;
|
use types::EthSpec;
|
||||||
|
|
||||||
use crate::discovery::enr_ext::EnrExt;
|
use crate::discovery::enr_ext::EnrExt;
|
||||||
use crate::rpc::GoodbyeReason;
|
|
||||||
use crate::types::SyncState;
|
use crate::types::SyncState;
|
||||||
use crate::{metrics, ClearDialError};
|
use crate::{metrics, ClearDialError};
|
||||||
|
|
||||||
@@ -94,26 +93,20 @@ impl<E: EthSpec> NetworkBehaviour for PeerManager<E> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if let Some(enr) = self.peers_to_dial.pop() {
|
if let Some(enr) = self.peers_to_dial.pop() {
|
||||||
let peer_id = enr.peer_id();
|
self.inject_peer_connection(&enr.peer_id(), ConnectingType::Dialing, Some(enr.clone()));
|
||||||
self.inject_peer_connection(&peer_id, ConnectingType::Dialing, Some(enr.clone()));
|
|
||||||
|
|
||||||
let quic_multiaddrs = if self.quic_enabled {
|
|
||||||
let quic_multiaddrs = enr.multiaddr_quic();
|
|
||||||
if !quic_multiaddrs.is_empty() {
|
|
||||||
debug!(self.log, "Dialing QUIC supported peer"; "peer_id"=> %peer_id, "quic_multiaddrs" => ?quic_multiaddrs);
|
|
||||||
}
|
|
||||||
quic_multiaddrs
|
|
||||||
} else {
|
|
||||||
Vec::new()
|
|
||||||
};
|
|
||||||
|
|
||||||
// Prioritize Quic connections over Tcp ones.
|
// Prioritize Quic connections over Tcp ones.
|
||||||
let multiaddrs = quic_multiaddrs
|
let multiaddrs = [
|
||||||
.into_iter()
|
self.quic_enabled
|
||||||
.chain(enr.multiaddr_tcp())
|
.then_some(enr.multiaddr_quic())
|
||||||
.collect();
|
.unwrap_or_default(),
|
||||||
|
enr.multiaddr_tcp(),
|
||||||
|
]
|
||||||
|
.concat();
|
||||||
|
|
||||||
|
debug!(self.log, "Dialing peer"; "peer_id"=> %enr.peer_id(), "multiaddrs" => ?multiaddrs);
|
||||||
return Poll::Ready(ToSwarm::Dial {
|
return Poll::Ready(ToSwarm::Dial {
|
||||||
opts: DialOpts::peer_id(peer_id)
|
opts: DialOpts::peer_id(enr.peer_id())
|
||||||
.condition(PeerCondition::Disconnected)
|
.condition(PeerCondition::Disconnected)
|
||||||
.addresses(multiaddrs)
|
.addresses(multiaddrs)
|
||||||
.build(),
|
.build(),
|
||||||
@@ -130,14 +123,7 @@ impl<E: EthSpec> NetworkBehaviour for PeerManager<E> {
|
|||||||
endpoint,
|
endpoint,
|
||||||
other_established,
|
other_established,
|
||||||
..
|
..
|
||||||
}) => {
|
}) => self.on_connection_established(peer_id, endpoint, other_established),
|
||||||
// NOTE: We still need to handle the [`ConnectionEstablished`] because the
|
|
||||||
// [`NetworkBehaviour::handle_established_inbound_connection`] and
|
|
||||||
// [`NetworkBehaviour::handle_established_outbound_connection`] are fallible. This
|
|
||||||
// means another behaviour can kill the connection early, and we can't assume a
|
|
||||||
// peer as connected until this event is received.
|
|
||||||
self.on_connection_established(peer_id, endpoint, other_established)
|
|
||||||
}
|
|
||||||
FromSwarm::ConnectionClosed(ConnectionClosed {
|
FromSwarm::ConnectionClosed(ConnectionClosed {
|
||||||
peer_id,
|
peer_id,
|
||||||
endpoint,
|
endpoint,
|
||||||
@@ -206,6 +192,21 @@ impl<E: EthSpec> NetworkBehaviour for PeerManager<E> {
|
|||||||
"Connection to peer rejected: peer has a bad score",
|
"Connection to peer rejected: peer has a bad score",
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Check the connection limits
|
||||||
|
if self.network_globals.connected_or_dialing_peers() >= self.max_peers()
|
||||||
|
&& self
|
||||||
|
.network_globals
|
||||||
|
.peers
|
||||||
|
.read()
|
||||||
|
.peer_info(&peer_id)
|
||||||
|
.map_or(true, |peer| !peer.has_future_duty())
|
||||||
|
{
|
||||||
|
return Err(ConnectionDenied::new(
|
||||||
|
"Connection to peer rejected: too many connections",
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
Ok(ConnectionHandler)
|
Ok(ConnectionHandler)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -218,13 +219,26 @@ impl<E: EthSpec> NetworkBehaviour for PeerManager<E> {
|
|||||||
_port_use: PortUse,
|
_port_use: PortUse,
|
||||||
) -> Result<libp2p::swarm::THandler<Self>, libp2p::swarm::ConnectionDenied> {
|
) -> Result<libp2p::swarm::THandler<Self>, libp2p::swarm::ConnectionDenied> {
|
||||||
trace!(self.log, "Outbound connection"; "peer_id" => %peer_id, "multiaddr" => %addr);
|
trace!(self.log, "Outbound connection"; "peer_id" => %peer_id, "multiaddr" => %addr);
|
||||||
match self.ban_status(&peer_id) {
|
if let Some(cause) = self.ban_status(&peer_id) {
|
||||||
Some(cause) => {
|
|
||||||
error!(self.log, "Connected a banned peer. Rejecting connection"; "peer_id" => %peer_id);
|
error!(self.log, "Connected a banned peer. Rejecting connection"; "peer_id" => %peer_id);
|
||||||
Err(ConnectionDenied::new(cause))
|
return Err(ConnectionDenied::new(cause));
|
||||||
}
|
}
|
||||||
None => Ok(ConnectionHandler),
|
|
||||||
|
// Check the connection limits
|
||||||
|
if self.network_globals.connected_peers() >= self.max_outbound_dialing_peers()
|
||||||
|
&& self
|
||||||
|
.network_globals
|
||||||
|
.peers
|
||||||
|
.read()
|
||||||
|
.peer_info(&peer_id)
|
||||||
|
.map_or(true, |peer| !peer.has_future_duty())
|
||||||
|
{
|
||||||
|
return Err(ConnectionDenied::new(
|
||||||
|
"Connection to peer rejected: too many connections",
|
||||||
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Ok(ConnectionHandler)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -233,7 +247,7 @@ impl<E: EthSpec> PeerManager<E> {
|
|||||||
&mut self,
|
&mut self,
|
||||||
peer_id: PeerId,
|
peer_id: PeerId,
|
||||||
endpoint: &ConnectedPoint,
|
endpoint: &ConnectedPoint,
|
||||||
other_established: usize,
|
_other_established: usize,
|
||||||
) {
|
) {
|
||||||
debug!(self.log, "Connection established"; "peer_id" => %peer_id,
|
debug!(self.log, "Connection established"; "peer_id" => %peer_id,
|
||||||
"multiaddr" => %endpoint.get_remote_address(),
|
"multiaddr" => %endpoint.get_remote_address(),
|
||||||
@@ -247,26 +261,6 @@ impl<E: EthSpec> PeerManager<E> {
|
|||||||
self.update_peer_count_metrics();
|
self.update_peer_count_metrics();
|
||||||
}
|
}
|
||||||
|
|
||||||
// Count dialing peers in the limit if the peer dialed us.
|
|
||||||
let count_dialing = endpoint.is_listener();
|
|
||||||
// Check the connection limits
|
|
||||||
if self.peer_limit_reached(count_dialing)
|
|
||||||
&& self
|
|
||||||
.network_globals
|
|
||||||
.peers
|
|
||||||
.read()
|
|
||||||
.peer_info(&peer_id)
|
|
||||||
.map_or(true, |peer| !peer.has_future_duty())
|
|
||||||
{
|
|
||||||
// Gracefully disconnect the peer.
|
|
||||||
self.disconnect_peer(peer_id, GoodbyeReason::TooManyPeers);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
if other_established == 0 {
|
|
||||||
self.events.push(PeerManagerEvent::MetaData(peer_id));
|
|
||||||
}
|
|
||||||
|
|
||||||
// NOTE: We don't register peers that we are disconnecting immediately. The network service
|
// NOTE: We don't register peers that we are disconnecting immediately. The network service
|
||||||
// does not need to know about these peers.
|
// does not need to know about these peers.
|
||||||
match endpoint {
|
match endpoint {
|
||||||
|
|||||||
@@ -1,39 +0,0 @@
|
|||||||
use crate::discovery::Discovery;
|
|
||||||
use crate::peer_manager::PeerManager;
|
|
||||||
use crate::rpc::RPC;
|
|
||||||
use crate::types::SnappyTransform;
|
|
||||||
|
|
||||||
use libp2p::identify;
|
|
||||||
use libp2p::swarm::behaviour::toggle::Toggle;
|
|
||||||
use libp2p::swarm::NetworkBehaviour;
|
|
||||||
use libp2p::upnp::tokio::Behaviour as Upnp;
|
|
||||||
use types::EthSpec;
|
|
||||||
|
|
||||||
use super::api_types::RequestId;
|
|
||||||
|
|
||||||
pub type SubscriptionFilter =
|
|
||||||
gossipsub::MaxCountSubscriptionFilter<gossipsub::WhitelistSubscriptionFilter>;
|
|
||||||
pub type Gossipsub = gossipsub::Behaviour<SnappyTransform, SubscriptionFilter>;
|
|
||||||
|
|
||||||
#[derive(NetworkBehaviour)]
|
|
||||||
pub(crate) struct Behaviour<E>
|
|
||||||
where
|
|
||||||
E: EthSpec,
|
|
||||||
{
|
|
||||||
/// Keep track of active and pending connections to enforce hard limits.
|
|
||||||
pub connection_limits: libp2p::connection_limits::Behaviour,
|
|
||||||
/// The peer manager that keeps track of peer's reputation and status.
|
|
||||||
pub peer_manager: PeerManager<E>,
|
|
||||||
/// The Eth2 RPC specified in the wire-0 protocol.
|
|
||||||
pub eth2_rpc: RPC<RequestId, E>,
|
|
||||||
/// Discv5 Discovery protocol.
|
|
||||||
pub discovery: Discovery<E>,
|
|
||||||
/// Keep regular connection to peers and disconnect if absent.
|
|
||||||
// NOTE: The id protocol is used for initial interop. This will be removed by mainnet.
|
|
||||||
/// Provides IP addresses and peer information.
|
|
||||||
pub identify: identify::Behaviour,
|
|
||||||
/// Libp2p UPnP port mapping.
|
|
||||||
pub upnp: Toggle<Upnp>,
|
|
||||||
/// The routing pub-sub mechanism for eth2.
|
|
||||||
pub gossipsub: Gossipsub,
|
|
||||||
}
|
|
||||||
@@ -1,4 +1,3 @@
|
|||||||
use self::behaviour::Behaviour;
|
|
||||||
use self::gossip_cache::GossipCache;
|
use self::gossip_cache::GossipCache;
|
||||||
use crate::config::{gossipsub_config, GossipsubConfigParams, NetworkLoad};
|
use crate::config::{gossipsub_config, GossipsubConfigParams, NetworkLoad};
|
||||||
use crate::discovery::{
|
use crate::discovery::{
|
||||||
@@ -14,8 +13,6 @@ use crate::rpc::{
|
|||||||
self, GoodbyeReason, HandlerErr, NetworkParams, Protocol, RPCError, RPCMessage, RPCReceived,
|
self, GoodbyeReason, HandlerErr, NetworkParams, Protocol, RPCError, RPCMessage, RPCReceived,
|
||||||
RequestType, ResponseTermination, RpcErrorResponse, RpcResponse, RpcSuccessResponse, RPC,
|
RequestType, ResponseTermination, RpcErrorResponse, RpcResponse, RpcSuccessResponse, RPC,
|
||||||
};
|
};
|
||||||
use crate::service::behaviour::BehaviourEvent;
|
|
||||||
pub use crate::service::behaviour::Gossipsub;
|
|
||||||
use crate::types::{
|
use crate::types::{
|
||||||
attestation_sync_committee_topics, fork_core_topics, subnet_from_topic_hash, GossipEncoding,
|
attestation_sync_committee_topics, fork_core_topics, subnet_from_topic_hash, GossipEncoding,
|
||||||
GossipKind, GossipTopic, SnappyTransform, Subnet, SubnetDiscovery, ALTAIR_CORE_TOPICS,
|
GossipKind, GossipTopic, SnappyTransform, Subnet, SubnetDiscovery, ALTAIR_CORE_TOPICS,
|
||||||
@@ -33,7 +30,8 @@ use gossipsub::{
|
|||||||
use gossipsub_scoring_parameters::{lighthouse_gossip_thresholds, PeerScoreSettings};
|
use gossipsub_scoring_parameters::{lighthouse_gossip_thresholds, PeerScoreSettings};
|
||||||
use libp2p::multiaddr::{self, Multiaddr, Protocol as MProtocol};
|
use libp2p::multiaddr::{self, Multiaddr, Protocol as MProtocol};
|
||||||
use libp2p::swarm::behaviour::toggle::Toggle;
|
use libp2p::swarm::behaviour::toggle::Toggle;
|
||||||
use libp2p::swarm::{Swarm, SwarmEvent};
|
use libp2p::swarm::{NetworkBehaviour, Swarm, SwarmEvent};
|
||||||
|
use libp2p::upnp::tokio::Behaviour as Upnp;
|
||||||
use libp2p::{identify, PeerId, SwarmBuilder};
|
use libp2p::{identify, PeerId, SwarmBuilder};
|
||||||
use slog::{crit, debug, info, o, trace, warn};
|
use slog::{crit, debug, info, o, trace, warn};
|
||||||
use std::num::{NonZeroU8, NonZeroUsize};
|
use std::num::{NonZeroU8, NonZeroUsize};
|
||||||
@@ -47,10 +45,9 @@ use types::{
|
|||||||
consts::altair::SYNC_COMMITTEE_SUBNET_COUNT, EnrForkId, EthSpec, ForkContext, Slot, SubnetId,
|
consts::altair::SYNC_COMMITTEE_SUBNET_COUNT, EnrForkId, EthSpec, ForkContext, Slot, SubnetId,
|
||||||
};
|
};
|
||||||
use types::{ChainSpec, ForkName};
|
use types::{ChainSpec, ForkName};
|
||||||
use utils::{build_transport, strip_peer_id, Context as ServiceContext, MAX_CONNECTIONS_PER_PEER};
|
use utils::{build_transport, strip_peer_id, Context as ServiceContext};
|
||||||
|
|
||||||
pub mod api_types;
|
pub mod api_types;
|
||||||
mod behaviour;
|
|
||||||
mod gossip_cache;
|
mod gossip_cache;
|
||||||
pub mod gossipsub_scoring_parameters;
|
pub mod gossipsub_scoring_parameters;
|
||||||
pub mod utils;
|
pub mod utils;
|
||||||
@@ -109,6 +106,41 @@ pub enum NetworkEvent<E: EthSpec> {
|
|||||||
ZeroListeners,
|
ZeroListeners,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub type Gossipsub = gossipsub::Behaviour<SnappyTransform, SubscriptionFilter>;
|
||||||
|
pub type SubscriptionFilter =
|
||||||
|
gossipsub::MaxCountSubscriptionFilter<gossipsub::WhitelistSubscriptionFilter>;
|
||||||
|
|
||||||
|
#[derive(NetworkBehaviour)]
|
||||||
|
pub(crate) struct Behaviour<E>
|
||||||
|
where
|
||||||
|
E: EthSpec,
|
||||||
|
{
|
||||||
|
// NOTE: The order of the following list of behaviours has meaning,
|
||||||
|
// `NetworkBehaviour::handle_{pending, established}_{inbound, outbound}` methods
|
||||||
|
// are called sequentially for each behaviour and they are fallible,
|
||||||
|
// therefore we want `connection_limits` and `peer_manager` running first,
|
||||||
|
// which are the behaviours that may reject a connection, so that
|
||||||
|
// when the subsequent behaviours are called they are certain the connection won't be rejected.
|
||||||
|
|
||||||
|
//
|
||||||
|
/// Keep track of active and pending connections to enforce hard limits.
|
||||||
|
pub connection_limits: libp2p::connection_limits::Behaviour,
|
||||||
|
/// The peer manager that keeps track of peer's reputation and status.
|
||||||
|
pub peer_manager: PeerManager<E>,
|
||||||
|
/// The Eth2 RPC specified in the wire-0 protocol.
|
||||||
|
pub eth2_rpc: RPC<RequestId, E>,
|
||||||
|
/// Discv5 Discovery protocol.
|
||||||
|
pub discovery: Discovery<E>,
|
||||||
|
/// Keep regular connection to peers and disconnect if absent.
|
||||||
|
// NOTE: The id protocol is used for initial interop. This will be removed by mainnet.
|
||||||
|
/// Provides IP addresses and peer information.
|
||||||
|
pub identify: identify::Behaviour,
|
||||||
|
/// Libp2p UPnP port mapping.
|
||||||
|
pub upnp: Toggle<Upnp>,
|
||||||
|
/// The routing pub-sub mechanism for eth2.
|
||||||
|
pub gossipsub: Gossipsub,
|
||||||
|
}
|
||||||
|
|
||||||
/// Builds the network behaviour that manages the core protocols of eth2.
|
/// Builds the network behaviour that manages the core protocols of eth2.
|
||||||
/// This core behaviour is managed by `Behaviour` which adds peer management to all core
|
/// This core behaviour is managed by `Behaviour` which adds peer management to all core
|
||||||
/// behaviours.
|
/// behaviours.
|
||||||
@@ -396,7 +428,7 @@ impl<E: EthSpec> Network<E> {
|
|||||||
(config.target_peers as f32 * (1.0 + PEER_EXCESS_FACTOR + PRIORITY_PEER_EXCESS))
|
(config.target_peers as f32 * (1.0 + PEER_EXCESS_FACTOR + PRIORITY_PEER_EXCESS))
|
||||||
.ceil() as u32,
|
.ceil() as u32,
|
||||||
))
|
))
|
||||||
.with_max_established_per_peer(Some(MAX_CONNECTIONS_PER_PEER));
|
.with_max_established_per_peer(Some(1));
|
||||||
|
|
||||||
libp2p::connection_limits::Behaviour::new(limits)
|
libp2p::connection_limits::Behaviour::new(limits)
|
||||||
};
|
};
|
||||||
@@ -1198,7 +1230,7 @@ impl<E: EthSpec> Network<E> {
|
|||||||
self.discovery_mut().remove_cached_enr(&enr.peer_id());
|
self.discovery_mut().remove_cached_enr(&enr.peer_id());
|
||||||
let peer_id = enr.peer_id();
|
let peer_id = enr.peer_id();
|
||||||
if self.peer_manager_mut().dial_peer(enr) {
|
if self.peer_manager_mut().dial_peer(enr) {
|
||||||
debug!(self.log, "Dialing cached ENR peer"; "peer_id" => %peer_id);
|
debug!(self.log, "Added cached ENR peer to dial queue"; "peer_id" => %peer_id);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -24,8 +24,6 @@ use types::{
|
|||||||
};
|
};
|
||||||
|
|
||||||
pub const NETWORK_KEY_FILENAME: &str = "key";
|
pub const NETWORK_KEY_FILENAME: &str = "key";
|
||||||
/// The maximum simultaneous libp2p connections per peer.
|
|
||||||
pub const MAX_CONNECTIONS_PER_PEER: u32 = 1;
|
|
||||||
/// The filename to store our local metadata.
|
/// The filename to store our local metadata.
|
||||||
pub const METADATA_FILENAME: &str = "metadata";
|
pub const METADATA_FILENAME: &str = "metadata";
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user