Add Experimental QUIC support (#4577)

## Issue Addressed

#4402 

## Proposed Changes

This PR adds QUIC support to Lighthouse. As this is not officially spec'd this will only work between lighthouse <-> lighthouse connections. We attempt a QUIC connection (if the node advertises it) and if it fails we fallback to TCP. 

This should be a backwards compatible modification. We want to test this functionality on live networks to observe any improvements in bandwidth/latency.

NOTE: This also removes the websockets transport as I believe no one is really using it. It should be mentioned in our release however.


Co-authored-by: João Oliveira <hello@jxs.pt>
This commit is contained in:
Age Manning
2023-09-15 03:07:24 +00:00
parent 35f47f454f
commit e4ed317b76
35 changed files with 1161 additions and 752 deletions

View File

@@ -21,7 +21,6 @@ pub use libp2p::identity::{Keypair, PublicKey};
use enr::{ATTESTATION_BITFIELD_ENR_KEY, ETH2_ENR_KEY, SYNC_COMMITTEE_BITFIELD_ENR_KEY};
use futures::prelude::*;
use futures::stream::FuturesUnordered;
use libp2p::multiaddr::Protocol;
use libp2p::swarm::behaviour::{DialFailure, FromSwarm};
use libp2p::swarm::THandlerInEvent;
pub use libp2p::{
@@ -75,7 +74,7 @@ const DURATION_DIFFERENCE: Duration = Duration::from_millis(1);
/// of the peer if it is specified.
#[derive(Debug)]
pub struct DiscoveredPeers {
pub peers: HashMap<PeerId, Option<Instant>>,
pub peers: HashMap<Enr, Option<Instant>>,
}
#[derive(Clone, PartialEq)]
@@ -208,7 +207,8 @@ impl<TSpec: EthSpec> Discovery<TSpec> {
let local_node_id = local_enr.node_id();
info!(log, "ENR Initialised"; "enr" => local_enr.to_base64(), "seq" => local_enr.seq(), "id"=> %local_enr.node_id(),
"ip4" => ?local_enr.ip4(), "udp4"=> ?local_enr.udp4(), "tcp4" => ?local_enr.tcp4(), "tcp6" => ?local_enr.tcp6(), "udp6" => ?local_enr.udp6()
"ip4" => ?local_enr.ip4(), "udp4"=> ?local_enr.udp4(), "tcp4" => ?local_enr.tcp4(), "tcp6" => ?local_enr.tcp6(), "udp6" => ?local_enr.udp6(),
"quic4" => ?local_enr.quic4(), "quic6" => ?local_enr.quic6()
);
// convert the keypair into an ENR key
@@ -230,7 +230,8 @@ impl<TSpec: EthSpec> Discovery<TSpec> {
"peer_id" => %bootnode_enr.peer_id(),
"ip" => ?bootnode_enr.ip4(),
"udp" => ?bootnode_enr.udp4(),
"tcp" => ?bootnode_enr.tcp4()
"tcp" => ?bootnode_enr.tcp4(),
"quic" => ?bootnode_enr.quic4()
);
let repr = bootnode_enr.to_string();
let _ = discv5.add_enr(bootnode_enr).map_err(|e| {
@@ -281,7 +282,8 @@ impl<TSpec: EthSpec> Discovery<TSpec> {
"peer_id" => %enr.peer_id(),
"ip" => ?enr.ip4(),
"udp" => ?enr.udp4(),
"tcp" => ?enr.tcp4()
"tcp" => ?enr.tcp4(),
"quic" => ?enr.quic4()
);
let _ = discv5.add_enr(enr).map_err(|e| {
error!(
@@ -383,20 +385,6 @@ impl<TSpec: EthSpec> Discovery<TSpec> {
self.discv5.table_entries_enr()
}
/// Returns the ENR of a known peer if it exists.
pub fn enr_of_peer(&mut self, peer_id: &PeerId) -> Option<Enr> {
// first search the local cache
if let Some(enr) = self.cached_enrs.get(peer_id) {
return Some(enr.clone());
}
// not in the local cache, look in the routing table
if let Ok(node_id) = enr_ext::peer_id_to_node_id(peer_id) {
self.discv5.find_enr(&node_id)
} else {
None
}
}
/// Updates the local ENR TCP port.
/// There currently isn't a case to update the address here. We opt for discovery to
/// automatically update the external address.
@@ -414,6 +402,23 @@ impl<TSpec: EthSpec> Discovery<TSpec> {
Ok(())
}
// TODO: Group these functions here once the ENR is shared across discv5 and lighthouse and
// Lighthouse can modify the ENR directly.
// This currently doesn't support ipv6. All of these functions should be removed and
// addressed properly in the following issue.
// https://github.com/sigp/lighthouse/issues/4706
pub fn update_enr_quic_port(&mut self, port: u16) -> Result<(), String> {
self.discv5
.enr_insert("quic", &port)
.map_err(|e| format!("{:?}", e))?;
// replace the global version
*self.network_globals.local_enr.write() = self.discv5.local_enr();
// persist modified enr to disk
enr::save_enr_to_disk(Path::new(&self.enr_dir), &self.local_enr(), &self.log);
Ok(())
}
/// Updates the local ENR UDP socket.
///
/// This is with caution. Discovery should automatically maintain this. This should only be
@@ -733,23 +738,6 @@ impl<TSpec: EthSpec> Discovery<TSpec> {
target_peers: usize,
additional_predicate: impl Fn(&Enr) -> bool + Send + 'static,
) {
// Make sure there are subnet queries included
let contains_queries = match &query {
QueryType::Subnet(queries) => !queries.is_empty(),
QueryType::FindPeers => true,
};
if !contains_queries {
debug!(
self.log,
"No subnets included in this request. Skipping discovery request."
);
return;
}
// Generate a random target node id.
let random_node = NodeId::random();
let enr_fork_id = match self.local_enr().eth2() {
Ok(v) => v,
Err(e) => {
@@ -773,7 +761,8 @@ impl<TSpec: EthSpec> Discovery<TSpec> {
// Build the future
let query_future = self
.discv5
.find_node_predicate(random_node, predicate, target_peers)
// Generate a random target node id.
.find_node_predicate(NodeId::random(), predicate, target_peers)
.map(|v| QueryResult {
query_type: query,
result: v,
@@ -787,7 +776,7 @@ impl<TSpec: EthSpec> Discovery<TSpec> {
fn process_completed_queries(
&mut self,
query: QueryResult,
) -> Option<HashMap<PeerId, Option<Instant>>> {
) -> Option<HashMap<Enr, Option<Instant>>> {
match query.query_type {
QueryType::FindPeers => {
self.find_peer_active = false;
@@ -797,12 +786,14 @@ impl<TSpec: EthSpec> Discovery<TSpec> {
}
Ok(r) => {
debug!(self.log, "Discovery query completed"; "peers_found" => r.len());
let mut results: HashMap<_, Option<Instant>> = HashMap::new();
r.iter().for_each(|enr| {
// cache the found ENR's
self.cached_enrs.put(enr.peer_id(), enr.clone());
results.insert(enr.peer_id(), None);
});
let results = r
.into_iter()
.map(|enr| {
// cache the found ENR's
self.cached_enrs.put(enr.peer_id(), enr.clone());
(enr, None)
})
.collect();
return Some(results);
}
Err(e) => {
@@ -850,17 +841,17 @@ impl<TSpec: EthSpec> Discovery<TSpec> {
let subnet_predicate =
subnet_predicate::<TSpec>(vec![query.subnet], &self.log);
r.iter()
r.clone()
.into_iter()
.filter(|enr| subnet_predicate(enr))
.map(|enr| enr.peer_id())
.for_each(|peer_id| {
.for_each(|enr| {
if let Some(v) = metrics::get_int_counter(
&metrics::SUBNET_PEERS_FOUND,
&[query_str],
) {
v.inc();
}
let other_min_ttl = mapped_results.get_mut(&peer_id);
let other_min_ttl = mapped_results.get_mut(&enr);
// map peer IDs to the min_ttl furthest in the future
match (query.min_ttl, other_min_ttl) {
@@ -878,15 +869,11 @@ impl<TSpec: EthSpec> Discovery<TSpec> {
}
// update the mapping if we have a specified min_ttl
(Some(min_ttl), Some(None)) => {
mapped_results.insert(peer_id, Some(min_ttl));
mapped_results.insert(enr, Some(min_ttl));
}
// first seen min_ttl for this enr
(Some(min_ttl), None) => {
mapped_results.insert(peer_id, Some(min_ttl));
}
// first seen min_ttl for this enr
(None, None) => {
mapped_results.insert(peer_id, None);
(min_ttl, None) => {
mapped_results.insert(enr, min_ttl);
}
(None, Some(Some(_))) => {} // Don't replace the existing specific min_ttl
(None, Some(None)) => {} // No-op because this is a duplicate
@@ -910,7 +897,7 @@ impl<TSpec: EthSpec> Discovery<TSpec> {
}
/// Drives the queries returning any results from completed queries.
fn poll_queries(&mut self, cx: &mut Context) -> Option<HashMap<PeerId, Option<Instant>>> {
fn poll_queries(&mut self, cx: &mut Context) -> Option<HashMap<Enr, Option<Instant>>> {
while let Poll::Ready(Some(query_result)) = self.active_queries.poll_next_unpin(cx) {
let result = self.process_completed_queries(query_result);
if result.is_some() {
@@ -957,23 +944,6 @@ impl<TSpec: EthSpec> NetworkBehaviour for Discovery<TSpec> {
) {
}
fn handle_pending_outbound_connection(
&mut self,
_connection_id: ConnectionId,
maybe_peer: Option<PeerId>,
_addresses: &[Multiaddr],
_effective_role: libp2p::core::Endpoint,
) -> Result<Vec<Multiaddr>, libp2p::swarm::ConnectionDenied> {
if let Some(enr) = maybe_peer.and_then(|peer_id| self.enr_of_peer(&peer_id)) {
// ENR's may have multiple Multiaddrs. The multi-addr associated with the UDP
// port is removed, which is assumed to be associated with the discv5 protocol (and
// therefore irrelevant for other libp2p components).
Ok(enr.multiaddr_tcp())
} else {
Ok(vec![])
}
}
// Main execution loop to drive the behaviour
fn poll(
&mut self,
@@ -1047,25 +1017,8 @@ impl<TSpec: EthSpec> NetworkBehaviour for Discovery<TSpec> {
// update network globals
*self.network_globals.local_enr.write() = enr;
// A new UDP socket has been detected.
// Build a multiaddr to report to libp2p
let addr = match socket_addr.ip() {
IpAddr::V4(v4_addr) => {
self.network_globals.listen_port_tcp4().map(|tcp4_port| {
Multiaddr::from(v4_addr).with(Protocol::Tcp(tcp4_port))
})
}
IpAddr::V6(v6_addr) => {
self.network_globals.listen_port_tcp6().map(|tcp6_port| {
Multiaddr::from(v6_addr).with(Protocol::Tcp(tcp6_port))
})
}
};
if let Some(address) = addr {
// NOTE: This doesn't actually track the external TCP port. More sophisticated NAT handling
// should handle this.
return Poll::Ready(ToSwarm::NewExternalAddrCandidate(address));
}
// NOTE: We assume libp2p itself can keep track of IP changes and we do
// not inform it about IP changes found via discovery.
}
Discv5Event::EnrAdded { .. }
| Discv5Event::TalkRequest(_)
@@ -1152,8 +1105,6 @@ mod tests {
let log = build_log(slog::Level::Debug, false);
let globals = NetworkGlobals::new(
enr,
Some(9000),
None,
MetaData::V2(MetaDataV2 {
seq_number: 0,
attnets: Default::default(),
@@ -1261,6 +1212,6 @@ mod tests {
assert_eq!(results.len(), 2);
// when a peer belongs to multiple subnet ids, we use the highest ttl.
assert_eq!(results.get(&enr1.peer_id()).unwrap(), &instant1);
assert_eq!(results.get(&enr1).unwrap(), &instant1);
}
}