remove nat module and use libp2p upnp (#4840)

* remove nat module and use libp2p upnp

* update Cargo.lock

* remove no longer used dependencies

* restore nat module refactored

* log successful mapping

* only activate upnp if config enabled

reduce logs to debug!

* Merge branch 'unstable' of https://github.com/sigp/lighthouse into libp2p-nat

* Merge branch 'unstable' of https://github.com/sigp/lighthouse into libp2p-nat

* Merge branch 'unstable' of https://github.com/sigp/lighthouse into libp2p-nat

* address review

* Merge branch 'unstable' of https://github.com/sigp/lighthouse into libp2p-nat

* Merge branch 'unstable' of https://github.com/sigp/lighthouse into libp2p-nat

* Merge branch 'unstable' of https://github.com/sigp/lighthouse into libp2p-nat

* address review
This commit is contained in:
João Oliveira
2024-02-27 07:29:18 +00:00
committed by GitHub
parent d36241b4a1
commit abd99652b4
9 changed files with 146 additions and 318 deletions

View File

@@ -3,231 +3,58 @@
//! Currently supported strategies:
//! - UPnP
use crate::{NetworkConfig, NetworkMessage};
use if_addrs::get_if_addrs;
use slog::{debug, info};
use std::net::{IpAddr, SocketAddr, SocketAddrV4};
use tokio::sync::mpsc;
use types::EthSpec;
use anyhow::{bail, Context, Error};
use igd_next::{aio::tokio as igd, PortMappingProtocol};
use slog::debug;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::time::Duration;
use tokio::time::sleep;
/// Configuration required to construct the UPnP port mappings.
pub struct UPnPConfig {
/// The local TCP port.
tcp_port: u16,
/// The local UDP discovery port.
disc_port: u16,
/// The local UDP quic port.
quic_port: u16,
/// Whether discovery is enabled or not.
disable_discovery: bool,
/// Whether quic is enabled or not.
disable_quic_support: bool,
}
/// The duration in seconds of a port mapping on the gateway.
const MAPPING_DURATION: u32 = 3600;
/// Contains mappings that managed to be established.
#[derive(Default, Debug)]
pub struct EstablishedUPnPMappings {
/// A TCP port mapping for libp2p.
pub tcp_port: Option<u16>,
/// A UDP port for the QUIC libp2p transport.
pub udp_quic_port: Option<u16>,
/// A UDP port for discv5.
pub udp_disc_port: Option<u16>,
}
/// Renew the Mapping every half of `MAPPING_DURATION` to avoid the port being unmapped.
const MAPPING_TIMEOUT: u64 = MAPPING_DURATION as u64 / 2;
impl EstablishedUPnPMappings {
/// Returns true if at least one value is set.
pub fn is_some(&self) -> bool {
self.tcp_port.is_some() || self.udp_quic_port.is_some() || self.udp_disc_port.is_some()
}
// Iterator over the UDP ports
pub fn udp_ports(&self) -> impl Iterator<Item = &u16> {
self.udp_quic_port.iter().chain(self.udp_disc_port.iter())
}
}
impl UPnPConfig {
pub fn from_config(config: &NetworkConfig) -> Option<Self> {
config.listen_addrs().v4().map(|v4_addr| UPnPConfig {
tcp_port: v4_addr.tcp_port,
disc_port: v4_addr.disc_port,
quic_port: v4_addr.quic_port,
disable_discovery: config.disable_discovery,
disable_quic_support: config.disable_quic_support,
})
}
}
/// Attempts to construct external port mappings with UPnP.
pub fn construct_upnp_mappings<T: EthSpec>(
config: UPnPConfig,
network_send: mpsc::UnboundedSender<NetworkMessage<T>>,
/// Attempts to map Discovery external port mappings with UPnP.
pub async fn construct_upnp_mappings(
addr: Ipv4Addr,
port: u16,
log: slog::Logger,
) {
info!(log, "UPnP Attempting to initialise routes");
match igd_next::search_gateway(Default::default()) {
Err(e) => info!(log, "UPnP not available"; "error" => %e),
Ok(gateway) => {
// Need to find the local listening address matched with the router subnet
let interfaces = match get_if_addrs() {
Ok(v) => v,
Err(e) => {
info!(log, "UPnP failed to get local interfaces"; "error" => %e);
return;
}
};
let local_ip = interfaces.iter().find_map(|interface| {
// Just use the first IP of the first interface that is not a loopback and not an
// ipv6 address.
if !interface.is_loopback() {
interface.ip().is_ipv4().then(|| interface.ip())
} else {
None
}
});
) -> Result<(), Error> {
let gateway = igd::search_gateway(Default::default())
.await
.context("Gateway does not support UPnP")?;
let local_ip = match local_ip {
None => {
info!(log, "UPnP failed to find local IP address");
return;
}
Some(v) => v,
};
let external_address = gateway
.get_external_ip()
.await
.context("Could not access gateway's external ip")?;
debug!(log, "UPnP Local IP Discovered"; "ip" => ?local_ip);
let mut mappings = EstablishedUPnPMappings::default();
match local_ip {
IpAddr::V4(address) => {
let libp2p_socket = SocketAddrV4::new(address, config.tcp_port);
let external_ip = gateway.get_external_ip();
// We add specific port mappings rather than getting the router to arbitrary assign
// one.
// I've found this to be more reliable. If multiple users are behind a single
// router, they should ideally try to set different port numbers.
mappings.tcp_port = add_port_mapping(
&gateway,
igd_next::PortMappingProtocol::TCP,
libp2p_socket,
"tcp",
&log,
).map(|_| {
let external_socket = external_ip.as_ref().map(|ip| SocketAddr::new(*ip, config.tcp_port)).map_err(|_| ());
info!(log, "UPnP TCP route established"; "external_socket" => format!("{}:{}", external_socket.as_ref().map(|ip| ip.to_string()).unwrap_or_else(|_| "".into()), config.tcp_port));
config.tcp_port
}).ok();
let set_udp_mapping = |udp_port| {
let udp_socket = SocketAddrV4::new(address, udp_port);
add_port_mapping(
&gateway,
igd_next::PortMappingProtocol::UDP,
udp_socket,
"udp",
&log,
).map(|_| {
info!(log, "UPnP UDP route established"; "external_socket" => format!("{}:{}", external_ip.as_ref().map(|ip| ip.to_string()).unwrap_or_else(|_| "".into()), udp_port));
})
};
// Set the discovery UDP port mapping
if !config.disable_discovery && set_udp_mapping(config.disc_port).is_ok() {
mappings.udp_disc_port = Some(config.disc_port);
}
// Set the quic UDP port mapping
if !config.disable_quic_support && set_udp_mapping(config.quic_port).is_ok() {
mappings.udp_quic_port = Some(config.quic_port);
}
// report any updates to the network service.
if mappings.is_some() {
network_send.send(NetworkMessage::UPnPMappingEstablished{ mappings })
.unwrap_or_else(|e| debug!(log, "Could not send message to the network service"; "error" => %e));
}
}
_ => debug!(log, "UPnP no routes constructed. IPv6 not supported"),
}
}
let is_private = match external_address {
IpAddr::V4(ipv4) => ipv4.is_private(),
IpAddr::V6(ipv6) => ipv6.is_loopback() || ipv6.is_unspecified(),
};
}
/// Sets up a port mapping for a protocol returning the mapped port if successful.
fn add_port_mapping(
gateway: &igd_next::Gateway,
protocol: igd_next::PortMappingProtocol,
socket: SocketAddrV4,
protocol_string: &'static str,
log: &slog::Logger,
) -> Result<(), ()> {
// We add specific port mappings rather than getting the router to arbitrary assign
// one.
// I've found this to be more reliable. If multiple users are behind a single
// router, they should ideally try to set different port numbers.
let mapping_string = &format!("lighthouse-{}", protocol_string);
for _ in 0..2 {
match gateway.add_port(
protocol,
socket.port(),
SocketAddr::V4(socket),
0,
mapping_string,
) {
Err(e) => {
match e {
igd_next::AddPortError::PortInUse => {
// Try and remove and re-create
debug!(log, "UPnP port in use, attempting to remap"; "protocol" => protocol_string, "port" => socket.port());
match gateway.remove_port(protocol, socket.port()) {
Ok(()) => {
debug!(log, "UPnP Removed port mapping"; "protocol" => protocol_string, "port" => socket.port())
}
Err(e) => {
debug!(log, "UPnP Port remove failure"; "protocol" => protocol_string, "port" => socket.port(), "error" => %e);
return Err(());
}
}
}
e => {
info!(log, "UPnP TCP route not set"; "error" => %e);
return Err(());
}
}
}
Ok(_) => {
return Ok(());
}
}
if is_private {
bail!(
"Gateway's external address is a private address: {}",
external_address
);
}
Err(())
}
/// Removes the specified TCP and UDP port mappings.
pub fn remove_mappings(mappings: &EstablishedUPnPMappings, log: &slog::Logger) {
if mappings.is_some() {
debug!(log, "Removing UPnP port mappings");
match igd_next::search_gateway(Default::default()) {
Ok(gateway) => {
if let Some(tcp_port) = mappings.tcp_port {
match gateway.remove_port(igd_next::PortMappingProtocol::TCP, tcp_port) {
Ok(()) => debug!(log, "UPnP Removed TCP port mapping"; "port" => tcp_port),
Err(e) => {
debug!(log, "UPnP Failed to remove TCP port mapping"; "port" => tcp_port, "error" => %e)
}
}
}
for udp_port in mappings.udp_ports() {
match gateway.remove_port(igd_next::PortMappingProtocol::UDP, *udp_port) {
Ok(()) => debug!(log, "UPnP Removed UDP port mapping"; "port" => udp_port),
Err(e) => {
debug!(log, "UPnP Failed to remove UDP port mapping"; "port" => udp_port, "error" => %e)
}
}
}
}
Err(e) => debug!(log, "UPnP failed to remove mappings"; "error" => %e),
}
loop {
gateway
.add_port(
PortMappingProtocol::UDP,
port,
SocketAddr::new(IpAddr::V4(addr), port),
MAPPING_DURATION,
"Lighthouse Discovery port",
)
.await
.with_context(|| format!("Could not UPnP map port: {} on the gateway", port))?;
debug!(log, "Discovery UPnP port mapped"; "port" => %port);
sleep(Duration::from_secs(MAPPING_TIMEOUT)).await;
}
}

View File

@@ -1,5 +1,5 @@
use super::sync::manager::RequestId as SyncId;
use crate::nat::EstablishedUPnPMappings;
use crate::nat;
use crate::network_beacon_processor::InvalidBlockStorage;
use crate::persisted_dht::{clear_dht, load_dht, persist_dht};
use crate::router::{Router, RouterMessage};
@@ -94,11 +94,6 @@ pub enum NetworkMessage<T: EthSpec> {
/// The result of the validation
validation_result: MessageAcceptance,
},
/// Called if UPnP managed to establish an external port mapping.
UPnPMappingEstablished {
/// The mappings that were established.
mappings: EstablishedUPnPMappings,
},
/// Reports a peer to the peer manager for performing an action.
ReportPeer {
peer_id: PeerId,
@@ -188,9 +183,6 @@ pub struct NetworkService<T: BeaconChainTypes> {
store: Arc<HotColdDB<T::EthSpec, T::HotStore, T::ColdStore>>,
/// A collection of global variables, accessible outside of the network service.
network_globals: Arc<NetworkGlobals<T::EthSpec>>,
/// Stores potentially created UPnP mappings to be removed on shutdown. (TCP port and UDP
/// ports).
upnp_mappings: EstablishedUPnPMappings,
/// A delay that expires when a new fork takes place.
next_fork_update: Pin<Box<OptionFuture<Sleep>>>,
/// A delay that expires when we need to subscribe to a new fork's topics.
@@ -237,22 +229,24 @@ impl<T: BeaconChainTypes> NetworkService<T> {
"Backfill is disabled. DO NOT RUN IN PRODUCTION"
);
// try and construct UPnP port mappings if required.
if let Some(upnp_config) = crate::nat::UPnPConfig::from_config(config) {
let upnp_log = network_log.new(o!("service" => "UPnP"));
let upnp_network_send = network_senders.network_send();
if config.upnp_enabled {
executor.spawn_blocking(
move || {
crate::nat::construct_upnp_mappings(
upnp_config,
upnp_network_send,
upnp_log,
)
},
"UPnP",
);
}
if let (true, false, Some(v4)) = (
config.upnp_enabled,
config.disable_discovery,
config.listen_addrs().v4(),
) {
let nw = network_log.clone();
let v4 = v4.clone();
executor.spawn(
async move {
info!(nw, "UPnP Attempting to initialise routes");
if let Err(e) =
nat::construct_upnp_mappings(v4.addr, v4.disc_port, nw.clone()).await
{
info!(nw, "Could not UPnP map Discovery port"; "error" => %e);
}
},
"UPnP",
);
}
// get a reference to the beacon chain store
@@ -358,7 +352,6 @@ impl<T: BeaconChainTypes> NetworkService<T> {
router_send,
store,
network_globals: network_globals.clone(),
upnp_mappings: EstablishedUPnPMappings::default(),
next_fork_update,
next_fork_subscriptions,
next_unsubscribe,
@@ -636,21 +629,6 @@ impl<T: BeaconChainTypes> NetworkService<T> {
} => {
self.libp2p.send_error_response(peer_id, id, error, reason);
}
NetworkMessage::UPnPMappingEstablished { mappings } => {
self.upnp_mappings = mappings;
// If there is an external TCP port update, modify our local ENR.
if let Some(tcp_port) = self.upnp_mappings.tcp_port {
if let Err(e) = self.libp2p.discovery_mut().update_enr_tcp_port(tcp_port) {
warn!(self.log, "Failed to update ENR"; "error" => e);
}
}
// If there is an external QUIC port update, modify our local ENR.
if let Some(quic_port) = self.upnp_mappings.udp_quic_port {
if let Err(e) = self.libp2p.discovery_mut().update_enr_quic_port(quic_port) {
warn!(self.log, "Failed to update ENR"; "error" => e);
}
}
}
NetworkMessage::ValidationResult {
propagation_source,
message_id,
@@ -1009,10 +987,6 @@ impl<T: BeaconChainTypes> Drop for NetworkService<T> {
"Saved DHT state";
),
}
// attempt to remove port mappings
crate::nat::remove_mappings(&self.upnp_mappings, &self.log);
info!(self.log, "Network service shutdown");
}
}