Merge branch 'unstable' into merge-unstable-to-deneb-20231005

# Conflicts:
#	.github/workflows/test-suite.yml
#	Cargo.lock
#	beacon_node/execution_layer/Cargo.toml
#	beacon_node/execution_layer/src/test_utils/mock_builder.rs
#	beacon_node/execution_layer/src/test_utils/mod.rs
#	beacon_node/network/src/service/tests.rs
#	consensus/types/src/builder_bid.rs
This commit is contained in:
Jimmy Chen
2023-10-05 15:21:18 +11:00
48 changed files with 1791 additions and 1491 deletions

View File

@@ -11,11 +11,17 @@ use libp2p::Multiaddr;
use serde_derive::{Deserialize, Serialize};
use sha2::{Digest, Sha256};
use std::net::{Ipv4Addr, Ipv6Addr};
use std::num::NonZeroU16;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use types::{ForkContext, ForkName};
pub const DEFAULT_IPV4_ADDRESS: Ipv4Addr = Ipv4Addr::UNSPECIFIED;
pub const DEFAULT_TCP_PORT: u16 = 9000u16;
pub const DEFAULT_DISC_PORT: u16 = 9000u16;
pub const DEFAULT_QUIC_PORT: u16 = 9001u16;
/// The cache time is set to accommodate the circulation time of an attestation.
///
/// The p2p spec declares that we accept attestations within the following range:
@@ -59,22 +65,22 @@ pub struct Config {
pub enr_address: (Option<Ipv4Addr>, Option<Ipv6Addr>),
/// The udp ipv4 port to broadcast to peers in order to reach back for discovery.
pub enr_udp4_port: Option<u16>,
pub enr_udp4_port: Option<NonZeroU16>,
/// The quic ipv4 port to broadcast to peers in order to reach back for libp2p services.
pub enr_quic4_port: Option<u16>,
pub enr_quic4_port: Option<NonZeroU16>,
/// The tcp ipv4 port to broadcast to peers in order to reach back for libp2p services.
pub enr_tcp4_port: Option<u16>,
pub enr_tcp4_port: Option<NonZeroU16>,
/// The udp ipv6 port to broadcast to peers in order to reach back for discovery.
pub enr_udp6_port: Option<u16>,
pub enr_udp6_port: Option<NonZeroU16>,
/// The tcp ipv6 port to broadcast to peers in order to reach back for libp2p services.
pub enr_tcp6_port: Option<u16>,
pub enr_tcp6_port: Option<NonZeroU16>,
/// The quic ipv6 port to broadcast to peers in order to reach back for libp2p services.
pub enr_quic6_port: Option<u16>,
pub enr_quic6_port: Option<NonZeroU16>,
/// Target number of connected peers.
pub target_peers: usize,
@@ -304,10 +310,10 @@ impl Default for Config {
.expect("The total rate limit has been specified"),
);
let listen_addresses = ListenAddress::V4(ListenAddr {
addr: Ipv4Addr::UNSPECIFIED,
disc_port: 9000,
quic_port: 9001,
tcp_port: 9000,
addr: DEFAULT_IPV4_ADDRESS,
disc_port: DEFAULT_DISC_PORT,
quic_port: DEFAULT_QUIC_PORT,
tcp_port: DEFAULT_TCP_PORT,
});
let discv5_listen_config =

View File

@@ -158,11 +158,11 @@ pub fn create_enr_builder_from_config<T: EnrKey>(
}
if let Some(udp4_port) = config.enr_udp4_port {
builder.udp4(udp4_port);
builder.udp4(udp4_port.get());
}
if let Some(udp6_port) = config.enr_udp6_port {
builder.udp6(udp6_port);
builder.udp6(udp6_port.get());
}
if enable_libp2p {
@@ -171,35 +171,45 @@ pub fn create_enr_builder_from_config<T: EnrKey>(
// the related fields should only be added when both QUIC and libp2p are enabled
if !config.disable_quic_support {
// If we are listening on ipv4, add the quic ipv4 port.
if let Some(quic4_port) = config
.enr_quic4_port
.or_else(|| config.listen_addrs().v4().map(|v4_addr| v4_addr.quic_port))
{
builder.add_value(QUIC_ENR_KEY, &quic4_port);
if let Some(quic4_port) = config.enr_quic4_port.or_else(|| {
config
.listen_addrs()
.v4()
.and_then(|v4_addr| v4_addr.quic_port.try_into().ok())
}) {
builder.add_value(QUIC_ENR_KEY, &quic4_port.get());
}
// If we are listening on ipv6, add the quic ipv6 port.
if let Some(quic6_port) = config
.enr_quic6_port
.or_else(|| config.listen_addrs().v6().map(|v6_addr| v6_addr.quic_port))
{
builder.add_value(QUIC6_ENR_KEY, &quic6_port);
if let Some(quic6_port) = config.enr_quic6_port.or_else(|| {
config
.listen_addrs()
.v6()
.and_then(|v6_addr| v6_addr.quic_port.try_into().ok())
}) {
builder.add_value(QUIC6_ENR_KEY, &quic6_port.get());
}
}
// If the ENR port is not set, and we are listening over that ip version, use the listening port instead.
let tcp4_port = config
.enr_tcp4_port
.or_else(|| config.listen_addrs().v4().map(|v4_addr| v4_addr.tcp_port));
let tcp4_port = config.enr_tcp4_port.or_else(|| {
config
.listen_addrs()
.v4()
.and_then(|v4_addr| v4_addr.tcp_port.try_into().ok())
});
if let Some(tcp4_port) = tcp4_port {
builder.tcp4(tcp4_port);
builder.tcp4(tcp4_port.get());
}
let tcp6_port = config
.enr_tcp6_port
.or_else(|| config.listen_addrs().v6().map(|v6_addr| v6_addr.tcp_port));
let tcp6_port = config.enr_tcp6_port.or_else(|| {
config
.listen_addrs()
.v6()
.and_then(|v6_addr| v6_addr.tcp_port.try_into().ok())
});
if let Some(tcp6_port) = tcp6_port {
builder.tcp6(tcp6_port);
builder.tcp6(tcp6_port.get());
}
}
builder

View File

@@ -21,10 +21,11 @@ pub use libp2p::identity::{Keypair, PublicKey};
use enr::{ATTESTATION_BITFIELD_ENR_KEY, ETH2_ENR_KEY, SYNC_COMMITTEE_BITFIELD_ENR_KEY};
use futures::prelude::*;
use futures::stream::FuturesUnordered;
use libp2p::multiaddr::Protocol;
use libp2p::swarm::behaviour::{DialFailure, FromSwarm};
use libp2p::swarm::THandlerInEvent;
pub use libp2p::{
core::{ConnectedPoint, Multiaddr},
core::{transport::ListenerId, ConnectedPoint, Multiaddr},
identity::PeerId,
swarm::{
dummy::ConnectionHandler, ConnectionId, DialError, NetworkBehaviour, NotifyHandler,
@@ -77,6 +78,19 @@ pub struct DiscoveredPeers {
pub peers: HashMap<Enr, Option<Instant>>,
}
/// Specifies which port numbers should be modified after start of the discovery service
#[derive(Debug)]
pub struct UpdatePorts {
/// TCP port associated wih IPv4 address (if present)
pub tcp4: bool,
/// TCP port associated wih IPv6 address (if present)
pub tcp6: bool,
/// QUIC port associated wih IPv4 address (if present)
pub quic4: bool,
/// QUIC port associated wih IPv6 address (if present)
pub quic6: bool,
}
#[derive(Clone, PartialEq)]
struct SubnetQuery {
subnet: Subnet,
@@ -177,12 +191,8 @@ pub struct Discovery<TSpec: EthSpec> {
/// always false.
pub started: bool,
/// This keeps track of whether an external UDP port change should also indicate an internal
/// TCP port change. As we cannot detect our external TCP port, we assume that the external UDP
/// port is also our external TCP port. This assumption only holds if the user has not
/// explicitly set their ENR TCP port via the CLI config. The first indicates tcp4 and the
/// second indicates tcp6.
update_tcp_port: (bool, bool),
/// Specifies whether various port numbers should be updated after the discovery service has been started
update_ports: UpdatePorts,
/// Logger for the discovery behaviour.
log: slog::Logger,
@@ -300,10 +310,12 @@ impl<TSpec: EthSpec> Discovery<TSpec> {
}
}
let update_tcp_port = (
config.enr_tcp4_port.is_none(),
config.enr_tcp6_port.is_none(),
);
let update_ports = UpdatePorts {
tcp4: config.enr_tcp4_port.is_none(),
tcp6: config.enr_tcp6_port.is_none(),
quic4: config.enr_quic4_port.is_none(),
quic6: config.enr_quic6_port.is_none(),
};
Ok(Self {
cached_enrs: LruCache::new(50),
@@ -314,7 +326,7 @@ impl<TSpec: EthSpec> Discovery<TSpec> {
discv5,
event_stream,
started: !config.disable_discovery,
update_tcp_port,
update_ports,
log,
enr_dir,
})
@@ -555,8 +567,6 @@ impl<TSpec: EthSpec> Discovery<TSpec> {
if let Ok(node_id) = peer_id_to_node_id(peer_id) {
// If we could convert this peer id, remove it from the DHT and ban it from discovery.
self.discv5.ban_node(&node_id, None);
// Remove the node from the routing table.
self.discv5.remove_node(&node_id);
}
for ip_address in ip_addresses {
@@ -1006,8 +1016,8 @@ impl<TSpec: EthSpec> NetworkBehaviour for Discovery<TSpec> {
// Discv5 will have updated our local ENR. We save the updated version
// to disk.
if (self.update_tcp_port.0 && socket_addr.is_ipv4())
|| (self.update_tcp_port.1 && socket_addr.is_ipv6())
if (self.update_ports.tcp4 && socket_addr.is_ipv4())
|| (self.update_ports.tcp6 && socket_addr.is_ipv6())
{
// Update the TCP port in the ENR
self.discv5.update_local_enr_socket(socket_addr, true);
@@ -1036,12 +1046,79 @@ impl<TSpec: EthSpec> NetworkBehaviour for Discovery<TSpec> {
FromSwarm::DialFailure(DialFailure { peer_id, error, .. }) => {
self.on_dial_failure(peer_id, error)
}
FromSwarm::NewListenAddr(ev) => {
let addr = ev.addr;
let listener_id = ev.listener_id;
trace!(self.log, "Received NewListenAddr event from swarm"; "listener_id" => ?listener_id, "addr" => ?addr);
let mut addr_iter = addr.iter();
let attempt_enr_update = match addr_iter.next() {
Some(Protocol::Ip4(_)) => match (addr_iter.next(), addr_iter.next()) {
(Some(Protocol::Tcp(port)), None) => {
if !self.update_ports.tcp4 {
debug!(self.log, "Skipping ENR update"; "multiaddr" => ?addr);
return;
}
self.update_enr_tcp_port(port)
}
(Some(Protocol::Udp(port)), Some(Protocol::QuicV1)) => {
if !self.update_ports.quic4 {
debug!(self.log, "Skipping ENR update"; "multiaddr" => ?addr);
return;
}
self.update_enr_quic_port(port)
}
_ => {
debug!(self.log, "Encountered unacceptable multiaddr for listening (unsupported transport)"; "addr" => ?addr);
return;
}
},
Some(Protocol::Ip6(_)) => match (addr_iter.next(), addr_iter.next()) {
(Some(Protocol::Tcp(port)), None) => {
if !self.update_ports.tcp6 {
debug!(self.log, "Skipping ENR update"; "multiaddr" => ?addr);
return;
}
self.update_enr_tcp_port(port)
}
(Some(Protocol::Udp(port)), Some(Protocol::QuicV1)) => {
if !self.update_ports.quic6 {
debug!(self.log, "Skipping ENR update"; "multiaddr" => ?addr);
return;
}
self.update_enr_quic_port(port)
}
_ => {
debug!(self.log, "Encountered unacceptable multiaddr for listening (unsupported transport)"; "addr" => ?addr);
return;
}
},
_ => {
debug!(self.log, "Encountered unacceptable multiaddr for listening (no IP)"; "addr" => ?addr);
return;
}
};
let local_enr: Enr = self.discv5.local_enr();
match attempt_enr_update {
Ok(_) => {
info!(self.log, "Updated local ENR"; "enr" => local_enr.to_base64(), "seq" => local_enr.seq(), "id"=> %local_enr.node_id(), "ip4" => ?local_enr.ip4(), "udp4"=> ?local_enr.udp4(), "tcp4" => ?local_enr.tcp4(), "tcp6" => ?local_enr.tcp6(), "udp6" => ?local_enr.udp6())
}
Err(e) => warn!(self.log, "Failed to update ENR"; "error" => ?e),
}
}
FromSwarm::ConnectionEstablished(_)
| FromSwarm::ConnectionClosed(_)
| FromSwarm::AddressChange(_)
| FromSwarm::ListenFailure(_)
| FromSwarm::NewListener(_)
| FromSwarm::NewListenAddr(_)
| FromSwarm::ExpiredListenAddr(_)
| FromSwarm::ListenerError(_)
| FromSwarm::ListenerClosed(_)

View File

@@ -415,7 +415,7 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
/// Reports if a peer is banned or not.
///
/// This is used to determine if we should accept incoming connections.
pub fn ban_status(&self, peer_id: &PeerId) -> BanResult {
pub fn ban_status(&self, peer_id: &PeerId) -> Option<BanResult> {
self.network_globals.peers.read().ban_status(peer_id)
}
@@ -815,7 +815,7 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
) -> bool {
{
let mut peerdb = self.network_globals.peers.write();
if !matches!(peerdb.ban_status(peer_id), BanResult::NotBanned) {
if peerdb.ban_status(peer_id).is_some() {
// don't connect if the peer is banned
error!(self.log, "Connection has been allowed to a banned peer"; "peer_id" => %peer_id);
}

View File

@@ -1,5 +1,6 @@
//! Implementation of [`NetworkBehaviour`] for the [`PeerManager`].
use std::net::IpAddr;
use std::task::{Context, Poll};
use futures::StreamExt;
@@ -8,17 +9,17 @@ use libp2p::identity::PeerId;
use libp2p::swarm::behaviour::{ConnectionClosed, ConnectionEstablished, DialFailure, FromSwarm};
use libp2p::swarm::dial_opts::{DialOpts, PeerCondition};
use libp2p::swarm::dummy::ConnectionHandler;
use libp2p::swarm::{ConnectionId, NetworkBehaviour, PollParameters, ToSwarm};
use slog::{debug, error};
use libp2p::swarm::{ConnectionDenied, ConnectionId, NetworkBehaviour, PollParameters, ToSwarm};
use slog::{debug, error, trace};
use types::EthSpec;
use crate::discovery::enr_ext::EnrExt;
use crate::peer_manager::peerdb::BanResult;
use crate::rpc::GoodbyeReason;
use crate::types::SyncState;
use crate::{metrics, ClearDialError};
use super::peerdb::BanResult;
use super::{ConnectingType, PeerManager, PeerManagerEvent, ReportSource};
use super::{ConnectingType, PeerManager, PeerManagerEvent};
impl<TSpec: EthSpec> NetworkBehaviour for PeerManager<TSpec> {
type ConnectionHandler = ConnectionHandler;
@@ -169,26 +170,64 @@ impl<TSpec: EthSpec> NetworkBehaviour for PeerManager<TSpec> {
}
}
fn handle_pending_inbound_connection(
&mut self,
_connection_id: ConnectionId,
_local_addr: &libp2p::Multiaddr,
remote_addr: &libp2p::Multiaddr,
) -> Result<(), ConnectionDenied> {
// get the IP address to verify it's not banned.
let ip = match remote_addr.iter().next() {
Some(libp2p::multiaddr::Protocol::Ip6(ip)) => IpAddr::V6(ip),
Some(libp2p::multiaddr::Protocol::Ip4(ip)) => IpAddr::V4(ip),
_ => {
return Err(ConnectionDenied::new(format!(
"Connection to peer rejected: invalid multiaddr: {remote_addr}"
)))
}
};
if self.network_globals.peers.read().is_ip_banned(&ip) {
return Err(ConnectionDenied::new(format!(
"Connection to peer rejected: peer {ip} is banned"
)));
}
Ok(())
}
fn handle_established_inbound_connection(
&mut self,
_connection_id: ConnectionId,
_peer: PeerId,
peer_id: PeerId,
_local_addr: &libp2p::Multiaddr,
_remote_addr: &libp2p::Multiaddr,
) -> Result<libp2p::swarm::THandler<Self>, libp2p::swarm::ConnectionDenied> {
// TODO: we might want to check if we accept this peer or not in the future.
remote_addr: &libp2p::Multiaddr,
) -> Result<libp2p::swarm::THandler<Self>, ConnectionDenied> {
trace!(self.log, "Inbound connection"; "peer_id" => %peer_id, "multiaddr" => %remote_addr);
// We already checked if the peer was banned on `handle_pending_inbound_connection`.
if let Some(BanResult::BadScore) = self.ban_status(&peer_id) {
return Err(ConnectionDenied::new(
"Connection to peer rejected: peer has a bad score",
));
}
Ok(ConnectionHandler)
}
fn handle_established_outbound_connection(
&mut self,
_connection_id: ConnectionId,
_peer: PeerId,
_addr: &libp2p::Multiaddr,
peer_id: PeerId,
addr: &libp2p::Multiaddr,
_role_override: libp2p::core::Endpoint,
) -> Result<libp2p::swarm::THandler<Self>, libp2p::swarm::ConnectionDenied> {
// TODO: we might want to check if we accept this peer or not in the future.
Ok(ConnectionHandler)
trace!(self.log, "Outbound connection"; "peer_id" => %peer_id, "multiaddr" => %addr);
match self.ban_status(&peer_id) {
Some(cause) => {
error!(self.log, "Connected a banned peer. Rejecting connection"; "peer_id" => %peer_id);
Err(ConnectionDenied::new(cause))
}
None => Ok(ConnectionHandler),
}
}
}
@@ -215,10 +254,7 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
// increment prometheus metrics
if self.metrics_enabled {
let remote_addr = match endpoint {
ConnectedPoint::Dialer { address, .. } => address,
ConnectedPoint::Listener { send_back_addr, .. } => send_back_addr,
};
let remote_addr = endpoint.get_remote_address();
match remote_addr.iter().find(|proto| {
matches!(
proto,
@@ -241,28 +277,6 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
metrics::inc_counter(&metrics::PEER_CONNECT_EVENT_COUNT);
}
// Check to make sure the peer is not supposed to be banned
match self.ban_status(&peer_id) {
// TODO: directly emit the ban event?
BanResult::BadScore => {
// This is a faulty state
error!(self.log, "Connected to a banned peer. Re-banning"; "peer_id" => %peer_id);
// Disconnect the peer.
self.goodbye_peer(&peer_id, GoodbyeReason::Banned, ReportSource::PeerManager);
// Re-ban the peer to prevent repeated errors.
self.events.push(PeerManagerEvent::Banned(peer_id, vec![]));
return;
}
BanResult::BannedIp(ip_addr) => {
// A good peer has connected to us via a banned IP address. We ban the peer and
// prevent future connections.
debug!(self.log, "Peer connected via banned IP. Banning"; "peer_id" => %peer_id, "banned_ip" => %ip_addr);
self.goodbye_peer(&peer_id, GoodbyeReason::BannedIP, ReportSource::PeerManager);
return;
}
BanResult::NotBanned => {}
}
// Count dialing peers in the limit if the peer dialed us.
let count_dialing = endpoint.is_listener();
// Check the connection limits
@@ -326,11 +340,7 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
// reference so that peer manager can track this peer.
self.inject_disconnect(&peer_id);
let remote_addr = match endpoint {
ConnectedPoint::Listener { send_back_addr, .. } => send_back_addr,
ConnectedPoint::Dialer { address, .. } => address,
};
let remote_addr = endpoint.get_remote_address();
// Update the prometheus metrics
if self.metrics_enabled {
match remote_addr.iter().find(|proto| {

View File

@@ -3,10 +3,13 @@ use peer_info::{ConnectionDirection, PeerConnectionStatus, PeerInfo};
use rand::seq::SliceRandom;
use score::{PeerAction, ReportSource, Score, ScoreState};
use slog::{crit, debug, error, trace, warn};
use std::cmp::Ordering;
use std::collections::{HashMap, HashSet};
use std::net::IpAddr;
use std::time::Instant;
use std::{cmp::Ordering, fmt::Display};
use std::{
collections::{HashMap, HashSet},
fmt::Formatter,
};
use sync_status::SyncStatus;
use types::EthSpec;
@@ -136,26 +139,18 @@ impl<TSpec: EthSpec> PeerDB<TSpec> {
}
}
/// Returns the current [`BanResult`] of the peer. This doesn't check the connection state, rather the
/// Returns the current [`BanResult`] of the peer if banned. This doesn't check the connection state, rather the
/// underlying score of the peer. A peer may be banned but still in the connected state
/// temporarily.
///
/// This is used to determine if we should accept incoming connections or not.
pub fn ban_status(&self, peer_id: &PeerId) -> BanResult {
if let Some(peer) = self.peers.get(peer_id) {
match peer.score_state() {
ScoreState::Banned => BanResult::BadScore,
_ => {
if let Some(ip) = self.ip_is_banned(peer) {
BanResult::BannedIp(ip)
} else {
BanResult::NotBanned
}
}
}
} else {
BanResult::NotBanned
}
pub fn ban_status(&self, peer_id: &PeerId) -> Option<BanResult> {
self.peers
.get(peer_id)
.and_then(|peer| match peer.score_state() {
ScoreState::Banned => Some(BanResult::BadScore),
_ => self.ip_is_banned(peer).map(BanResult::BannedIp),
})
}
/// Checks if the peer's known addresses are currently banned.
@@ -1183,23 +1178,25 @@ pub enum BanOperation {
}
/// When checking if a peer is banned, it can be banned for multiple reasons.
#[derive(Copy, Clone, Debug)]
pub enum BanResult {
/// The peer's score is too low causing it to be banned.
BadScore,
/// The peer should be banned because it is connecting from a banned IP address.
BannedIp(IpAddr),
/// The peer is not banned.
NotBanned,
}
// Helper function for unit tests
#[cfg(test)]
impl BanResult {
pub fn is_banned(&self) -> bool {
!matches!(self, BanResult::NotBanned)
impl Display for BanResult {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
BanResult::BadScore => write!(f, "Peer has a bad score"),
BanResult::BannedIp(addr) => write!(f, "Peer address: {} is banned", addr),
}
}
}
impl std::error::Error for BanResult {}
#[derive(Default)]
pub struct BannedPeersCount {
/// The number of banned peers in the database.
@@ -1852,11 +1849,11 @@ mod tests {
}
//check that ip1 and ip2 are banned but ip3-5 not
assert!(pdb.ban_status(&p1).is_banned());
assert!(pdb.ban_status(&p2).is_banned());
assert!(!pdb.ban_status(&p3).is_banned());
assert!(!pdb.ban_status(&p4).is_banned());
assert!(!pdb.ban_status(&p5).is_banned());
assert!(pdb.ban_status(&p1).is_some());
assert!(pdb.ban_status(&p2).is_some());
assert!(pdb.ban_status(&p3).is_none());
assert!(pdb.ban_status(&p4).is_none());
assert!(pdb.ban_status(&p5).is_none());
//ban also the last peer in peers
let _ = pdb.report_peer(
@@ -1868,11 +1865,11 @@ mod tests {
pdb.inject_disconnect(&peers[BANNED_PEERS_PER_IP_THRESHOLD + 1]);
//check that ip1-ip4 are banned but ip5 not
assert!(pdb.ban_status(&p1).is_banned());
assert!(pdb.ban_status(&p2).is_banned());
assert!(pdb.ban_status(&p3).is_banned());
assert!(pdb.ban_status(&p4).is_banned());
assert!(!pdb.ban_status(&p5).is_banned());
assert!(pdb.ban_status(&p1).is_some());
assert!(pdb.ban_status(&p2).is_some());
assert!(pdb.ban_status(&p3).is_some());
assert!(pdb.ban_status(&p4).is_some());
assert!(pdb.ban_status(&p5).is_none());
//peers[0] gets unbanned
reset_score(&mut pdb, &peers[0]);
@@ -1880,11 +1877,11 @@ mod tests {
let _ = pdb.shrink_to_fit();
//nothing changed
assert!(pdb.ban_status(&p1).is_banned());
assert!(pdb.ban_status(&p2).is_banned());
assert!(pdb.ban_status(&p3).is_banned());
assert!(pdb.ban_status(&p4).is_banned());
assert!(!pdb.ban_status(&p5).is_banned());
assert!(pdb.ban_status(&p1).is_some());
assert!(pdb.ban_status(&p2).is_some());
assert!(pdb.ban_status(&p3).is_some());
assert!(pdb.ban_status(&p4).is_some());
assert!(pdb.ban_status(&p5).is_none());
//peers[1] gets unbanned
reset_score(&mut pdb, &peers[1]);
@@ -1892,11 +1889,11 @@ mod tests {
let _ = pdb.shrink_to_fit();
//all ips are unbanned
assert!(!pdb.ban_status(&p1).is_banned());
assert!(!pdb.ban_status(&p2).is_banned());
assert!(!pdb.ban_status(&p3).is_banned());
assert!(!pdb.ban_status(&p4).is_banned());
assert!(!pdb.ban_status(&p5).is_banned());
assert!(pdb.ban_status(&p1).is_none());
assert!(pdb.ban_status(&p2).is_none());
assert!(pdb.ban_status(&p3).is_none());
assert!(pdb.ban_status(&p4).is_none());
assert!(pdb.ban_status(&p5).is_none());
}
#[test]
@@ -1921,8 +1918,8 @@ mod tests {
}
// check ip is banned
assert!(pdb.ban_status(&p1).is_banned());
assert!(!pdb.ban_status(&p2).is_banned());
assert!(pdb.ban_status(&p1).is_some());
assert!(pdb.ban_status(&p2).is_none());
// unban a peer
reset_score(&mut pdb, &peers[0]);
@@ -1930,8 +1927,8 @@ mod tests {
let _ = pdb.shrink_to_fit();
// check not banned anymore
assert!(!pdb.ban_status(&p1).is_banned());
assert!(!pdb.ban_status(&p2).is_banned());
assert!(pdb.ban_status(&p1).is_none());
assert!(pdb.ban_status(&p2).is_none());
// unban all peers
for p in &peers {
@@ -1950,8 +1947,8 @@ mod tests {
}
// both IP's are now banned
assert!(pdb.ban_status(&p1).is_banned());
assert!(pdb.ban_status(&p2).is_banned());
assert!(pdb.ban_status(&p1).is_some());
assert!(pdb.ban_status(&p2).is_some());
// unban all peers
for p in &peers {
@@ -1967,16 +1964,16 @@ mod tests {
}
// nothing is banned
assert!(!pdb.ban_status(&p1).is_banned());
assert!(!pdb.ban_status(&p2).is_banned());
assert!(pdb.ban_status(&p1).is_none());
assert!(pdb.ban_status(&p2).is_none());
// reban last peer
let _ = pdb.report_peer(&peers[0], PeerAction::Fatal, ReportSource::PeerManager, "");
pdb.inject_disconnect(&peers[0]);
//Ip's are banned again
assert!(pdb.ban_status(&p1).is_banned());
assert!(pdb.ban_status(&p2).is_banned());
assert!(pdb.ban_status(&p1).is_some());
assert!(pdb.ban_status(&p2).is_some());
}
#[test]

View File

@@ -20,8 +20,6 @@ where
AppReqId: ReqId,
TSpec: EthSpec,
{
/// Peers banned.
pub banned_peers: libp2p::allow_block_list::Behaviour<libp2p::allow_block_list::BlockedPeers>,
/// Keep track of active and pending connections to enforce hard limits.
pub connection_limits: libp2p::connection_limits::Behaviour,
/// The routing pub-sub mechanism for eth2.

View File

@@ -27,6 +27,7 @@ use gossipsub_scoring_parameters::{lighthouse_gossip_thresholds, PeerScoreSettin
use libp2p::bandwidth::BandwidthSinks;
use libp2p::gossipsub::{
self, IdentTopic as Topic, MessageAcceptance, MessageAuthenticity, MessageId, PublishError,
TopicScoreParams,
};
use libp2p::identify;
use libp2p::multiaddr::{Multiaddr, Protocol as MProtocol};
@@ -353,11 +354,8 @@ impl<AppReqId: ReqId, TSpec: EthSpec> Network<AppReqId, TSpec> {
libp2p::connection_limits::Behaviour::new(limits)
};
let banned_peers = libp2p::allow_block_list::Behaviour::default();
let behaviour = {
Behaviour {
banned_peers,
gossipsub,
eth2_rpc,
discovery,
@@ -637,6 +635,38 @@ impl<AppReqId: ReqId, TSpec: EthSpec> Network<AppReqId, TSpec> {
}
}
/// Remove topic weight from all topics that don't have the given fork digest.
pub fn remove_topic_weight_except(&mut self, except: [u8; 4]) {
let new_param = TopicScoreParams {
topic_weight: 0.0,
..Default::default()
};
let subscriptions = self.network_globals.gossipsub_subscriptions.read().clone();
for topic in subscriptions
.iter()
.filter(|topic| topic.fork_digest != except)
{
let libp2p_topic: Topic = topic.clone().into();
match self
.gossipsub_mut()
.set_topic_params(libp2p_topic, new_param.clone())
{
Ok(_) => debug!(self.log, "Removed topic weight"; "topic" => %topic),
Err(e) => {
warn!(self.log, "Failed to remove topic weight"; "topic" => %topic, "error" => e)
}
}
}
}
/// Returns the scoring parameters for a topic if set.
pub fn get_topic_params(&self, topic: GossipTopic) -> Option<&TopicScoreParams> {
self.swarm
.behaviour()
.gossipsub
.get_topic_params(&topic.into())
}
/// Subscribes to a gossipsub topic.
///
/// Returns `true` if the subscription was successful and `false` otherwise.
@@ -1445,15 +1475,10 @@ impl<AppReqId: ReqId, TSpec: EthSpec> Network<AppReqId, TSpec> {
Some(NetworkEvent::PeerDisconnected(peer_id))
}
PeerManagerEvent::Banned(peer_id, associated_ips) => {
self.swarm.behaviour_mut().banned_peers.block_peer(peer_id);
self.discovery_mut().ban_peer(&peer_id, associated_ips);
None
}
PeerManagerEvent::UnBanned(peer_id, associated_ips) => {
self.swarm
.behaviour_mut()
.banned_peers
.unblock_peer(peer_id);
self.discovery_mut().unban_peer(&peer_id, associated_ips);
None
}
@@ -1502,7 +1527,6 @@ impl<AppReqId: ReqId, TSpec: EthSpec> Network<AppReqId, TSpec> {
let maybe_event = match swarm_event {
SwarmEvent::Behaviour(behaviour_event) => match behaviour_event {
// Handle sub-behaviour events.
BehaviourEvent::BannedPeers(void) => void::unreachable(void),
BehaviourEvent::Gossipsub(ge) => self.inject_gs_event(ge),
BehaviourEvent::Eth2Rpc(re) => self.inject_rpc_event(re),
// Inform the peer manager about discovered peers.