mirror of
https://github.com/sigp/lighthouse.git
synced 2026-03-20 21:34:46 +00:00
libp2p upgrade + gossipsub interval fix (#3012)
## Issue Addressed Lighthouse gossiping late messages ## Proposed Changes Point LH to our fork using tokio interval, which 1) works as expected 2) is more performant than the previous version that actually worked as expected Upgrade libp2p ## Additional Info https://github.com/libp2p/rust-libp2p/issues/2497
This commit is contained in:
@@ -38,10 +38,12 @@ directory = { path = "../../common/directory" }
|
||||
regex = "1.3.9"
|
||||
strum = { version = "0.21.0", features = ["derive"] }
|
||||
superstruct = "0.4.0"
|
||||
open-metrics-client = "0.14.0"
|
||||
prometheus-client = "0.15.0"
|
||||
|
||||
[dependencies.libp2p]
|
||||
version = "0.42.1"
|
||||
git = "https://github.com/sigp/rust-libp2p"
|
||||
# branch libp2p-gossipsub-interval-hotfix
|
||||
rev = "e213703e616eaba3c482d7714775e0d37c4ae8e5"
|
||||
default-features = false
|
||||
features = ["websocket", "identify", "mplex", "yamux", "noise", "gossipsub", "dns-tokio", "tcp-tokio", "plaintext", "secp256k1"]
|
||||
|
||||
|
||||
@@ -927,24 +927,6 @@ impl<TSpec: EthSpec> NetworkBehaviour for Discovery<TSpec> {
|
||||
}
|
||||
}
|
||||
|
||||
fn inject_connected(&mut self, _peer_id: &PeerId) {}
|
||||
fn inject_disconnected(&mut self, _peer_id: &PeerId) {}
|
||||
fn inject_connection_established(
|
||||
&mut self,
|
||||
_peer_id: &PeerId,
|
||||
_connection_id: &ConnectionId,
|
||||
_endpoint: &ConnectedPoint,
|
||||
_failed_addresses: Option<&Vec<Multiaddr>>,
|
||||
) {
|
||||
}
|
||||
fn inject_connection_closed(
|
||||
&mut self,
|
||||
_: &PeerId,
|
||||
_: &ConnectionId,
|
||||
_connected_point: &ConnectedPoint,
|
||||
_handler: Self::ProtocolsHandler,
|
||||
) {
|
||||
}
|
||||
fn inject_event(
|
||||
&mut self,
|
||||
_: PeerId,
|
||||
|
||||
@@ -67,7 +67,7 @@ pub use crate::types::{
|
||||
SubnetDiscovery,
|
||||
};
|
||||
|
||||
pub use open_metrics_client;
|
||||
pub use prometheus_client;
|
||||
|
||||
pub use behaviour::{BehaviourEvent, Gossipsub, PeerRequestId, Request, Response};
|
||||
pub use config::Config as NetworkConfig;
|
||||
|
||||
@@ -110,6 +110,7 @@ impl<TSpec: EthSpec> NetworkBehaviour for PeerManager<TSpec> {
|
||||
_connection_id: &ConnectionId,
|
||||
endpoint: &ConnectedPoint,
|
||||
_failed_addresses: Option<&Vec<Multiaddr>>,
|
||||
_other_established: usize,
|
||||
) {
|
||||
debug!(self.log, "Connection established"; "peer_id" => %peer_id, "connection" => ?endpoint.to_endpoint());
|
||||
// Check NAT if metrics are enabled
|
||||
@@ -172,8 +173,18 @@ impl<TSpec: EthSpec> NetworkBehaviour for PeerManager<TSpec> {
|
||||
self.update_connected_peer_metrics();
|
||||
metrics::inc_counter(&metrics::PEER_CONNECT_EVENT_COUNT);
|
||||
}
|
||||
fn inject_connection_closed(
|
||||
&mut self,
|
||||
peer_id: &PeerId,
|
||||
_: &ConnectionId,
|
||||
_: &ConnectedPoint,
|
||||
_: DummyProtocolsHandler,
|
||||
remaining_established: usize,
|
||||
) {
|
||||
if remaining_established > 0 {
|
||||
return;
|
||||
}
|
||||
|
||||
fn inject_disconnected(&mut self, peer_id: &PeerId) {
|
||||
// There are no more connections
|
||||
if self
|
||||
.network_globals
|
||||
|
||||
@@ -202,36 +202,25 @@ where
|
||||
}
|
||||
|
||||
// Use connection established/closed instead of these currently
|
||||
fn inject_connected(&mut self, peer_id: &PeerId) {
|
||||
// find the peer's meta-data
|
||||
debug!(self.log, "Requesting new peer's metadata"; "peer_id" => %peer_id);
|
||||
let rpc_event =
|
||||
RPCSend::Request(RequestId::Behaviour, OutboundRequest::MetaData(PhantomData));
|
||||
self.events.push(NetworkBehaviourAction::NotifyHandler {
|
||||
peer_id: *peer_id,
|
||||
handler: NotifyHandler::Any,
|
||||
event: rpc_event,
|
||||
});
|
||||
}
|
||||
|
||||
fn inject_disconnected(&mut self, _peer_id: &PeerId) {}
|
||||
|
||||
fn inject_connection_established(
|
||||
&mut self,
|
||||
_peer_id: &PeerId,
|
||||
peer_id: &PeerId,
|
||||
_connection_id: &ConnectionId,
|
||||
_endpoint: &ConnectedPoint,
|
||||
_failed_addresses: Option<&Vec<Multiaddr>>,
|
||||
other_established: usize,
|
||||
) {
|
||||
}
|
||||
|
||||
fn inject_connection_closed(
|
||||
&mut self,
|
||||
_peer_id: &PeerId,
|
||||
_: &ConnectionId,
|
||||
_connected_point: &ConnectedPoint,
|
||||
_handler: Self::ProtocolsHandler,
|
||||
) {
|
||||
if other_established == 0 {
|
||||
// find the peer's meta-data
|
||||
debug!(self.log, "Requesting new peer's metadata"; "peer_id" => %peer_id);
|
||||
let rpc_event =
|
||||
RPCSend::Request(RequestId::Behaviour, OutboundRequest::MetaData(PhantomData));
|
||||
self.events.push(NetworkBehaviourAction::NotifyHandler {
|
||||
peer_id: *peer_id,
|
||||
handler: NotifyHandler::Any,
|
||||
event: rpc_event,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
fn inject_event(
|
||||
|
||||
@@ -21,7 +21,7 @@ use libp2p::{
|
||||
swarm::{SwarmBuilder, SwarmEvent},
|
||||
PeerId, Swarm, Transport,
|
||||
};
|
||||
use open_metrics_client::registry::Registry;
|
||||
use prometheus_client::registry::Registry;
|
||||
use slog::{crit, debug, info, o, trace, warn, Logger};
|
||||
use ssz::Decode;
|
||||
use std::fs::File;
|
||||
|
||||
@@ -100,10 +100,8 @@ where
|
||||
inner: TInner,
|
||||
|
||||
pub addresses_of_peer: Vec<PeerId>,
|
||||
pub inject_connected: Vec<PeerId>,
|
||||
pub inject_disconnected: Vec<PeerId>,
|
||||
pub inject_connection_established: Vec<(PeerId, ConnectionId, ConnectedPoint)>,
|
||||
pub inject_connection_closed: Vec<(PeerId, ConnectionId, ConnectedPoint)>,
|
||||
pub inject_connection_established: Vec<(PeerId, ConnectionId, ConnectedPoint, usize)>,
|
||||
pub inject_connection_closed: Vec<(PeerId, ConnectionId, ConnectedPoint, usize)>,
|
||||
pub inject_event: Vec<(
|
||||
PeerId,
|
||||
ConnectionId,
|
||||
@@ -128,8 +126,6 @@ where
|
||||
Self {
|
||||
inner,
|
||||
addresses_of_peer: Vec::new(),
|
||||
inject_connected: Vec::new(),
|
||||
inject_disconnected: Vec::new(),
|
||||
inject_connection_established: Vec::new(),
|
||||
inject_connection_closed: Vec::new(),
|
||||
inject_event: Vec::new(),
|
||||
@@ -148,8 +144,6 @@ where
|
||||
#[allow(dead_code)]
|
||||
pub fn reset(&mut self) {
|
||||
self.addresses_of_peer = Vec::new();
|
||||
self.inject_connected = Vec::new();
|
||||
self.inject_disconnected = Vec::new();
|
||||
self.inject_connection_established = Vec::new();
|
||||
self.inject_connection_closed = Vec::new();
|
||||
self.inject_event = Vec::new();
|
||||
@@ -176,7 +170,13 @@ where
|
||||
expected_disconnections: usize,
|
||||
) -> bool {
|
||||
if self.inject_connection_closed.len() == expected_closed_connections {
|
||||
assert_eq!(self.inject_disconnected.len(), expected_disconnections);
|
||||
assert_eq!(
|
||||
self.inject_connection_closed
|
||||
.iter()
|
||||
.filter(|(.., remaining_established)| { *remaining_established == 0 })
|
||||
.count(),
|
||||
expected_disconnections
|
||||
);
|
||||
return true;
|
||||
}
|
||||
|
||||
@@ -193,7 +193,15 @@ where
|
||||
expected_connections: usize,
|
||||
) -> bool {
|
||||
if self.inject_connection_established.len() == expected_established_connections {
|
||||
assert_eq!(self.inject_connected.len(), expected_connections);
|
||||
assert_eq!(
|
||||
self.inject_connection_established
|
||||
.iter()
|
||||
.filter(|(.., reported_aditional_connections)| {
|
||||
*reported_aditional_connections == 0
|
||||
})
|
||||
.count(),
|
||||
expected_connections
|
||||
);
|
||||
return true;
|
||||
}
|
||||
|
||||
@@ -219,37 +227,45 @@ where
|
||||
self.inner.addresses_of_peer(p)
|
||||
}
|
||||
|
||||
fn inject_connected(&mut self, peer: &PeerId) {
|
||||
assert!(
|
||||
self.inject_connection_established
|
||||
.iter()
|
||||
.any(|(peer_id, _, _)| peer_id == peer),
|
||||
"`inject_connected` is called after at least one `inject_connection_established`."
|
||||
);
|
||||
self.inject_connected.push(*peer);
|
||||
self.inner.inject_connected(peer);
|
||||
}
|
||||
|
||||
fn inject_connection_established(
|
||||
&mut self,
|
||||
p: &PeerId,
|
||||
c: &ConnectionId,
|
||||
e: &ConnectedPoint,
|
||||
errors: Option<&Vec<Multiaddr>>,
|
||||
other_established: usize,
|
||||
) {
|
||||
self.inject_connection_established.push((*p, *c, e.clone()));
|
||||
self.inner.inject_connection_established(p, c, e, errors);
|
||||
}
|
||||
let mut other_peer_connections = self
|
||||
.inject_connection_established
|
||||
.iter()
|
||||
.rev() // take last to first
|
||||
.filter_map(|(peer, .., other_established)| {
|
||||
if p == peer {
|
||||
Some(other_established)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
.take(other_established);
|
||||
|
||||
fn inject_disconnected(&mut self, peer: &PeerId) {
|
||||
assert!(
|
||||
self.inject_connection_closed
|
||||
.iter()
|
||||
.any(|(peer_id, _, _)| peer_id == peer),
|
||||
"`inject_disconnected` is called after at least one `inject_connection_closed`."
|
||||
);
|
||||
self.inject_disconnected.push(*peer);
|
||||
self.inner.inject_disconnected(peer);
|
||||
// We are informed that there are `other_established` additional connections. Ensure that the
|
||||
// number of previous connections is consistent with this
|
||||
if let Some(&prev) = other_peer_connections.next() {
|
||||
if prev < other_established {
|
||||
assert_eq!(
|
||||
prev,
|
||||
other_established - 1,
|
||||
"Inconsistent connection reporting"
|
||||
)
|
||||
}
|
||||
assert_eq!(other_peer_connections.count(), other_established - 1);
|
||||
} else {
|
||||
assert_eq!(other_established, 0)
|
||||
}
|
||||
self.inject_connection_established
|
||||
.push((*p, *c, e.clone(), other_established));
|
||||
self.inner
|
||||
.inject_connection_established(p, c, e, errors, other_established);
|
||||
}
|
||||
|
||||
fn inject_connection_closed(
|
||||
@@ -258,15 +274,46 @@ where
|
||||
c: &ConnectionId,
|
||||
e: &ConnectedPoint,
|
||||
handler: <Self::ProtocolsHandler as IntoProtocolsHandler>::Handler,
|
||||
remaining_established: usize,
|
||||
) {
|
||||
let connection = (*p, *c, e.clone());
|
||||
let mut other_closed_connections = self
|
||||
.inject_connection_established
|
||||
.iter()
|
||||
.rev() // take last to first
|
||||
.filter_map(|(peer, .., remaining_established)| {
|
||||
if p == peer {
|
||||
Some(remaining_established)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
.take(remaining_established);
|
||||
|
||||
// We are informed that there are `other_established` additional connections. Ensure that the
|
||||
// number of previous connections is consistent with this
|
||||
if let Some(&prev) = other_closed_connections.next() {
|
||||
if prev < remaining_established {
|
||||
assert_eq!(
|
||||
prev,
|
||||
remaining_established - 1,
|
||||
"Inconsistent closed connection reporting"
|
||||
)
|
||||
}
|
||||
assert_eq!(other_closed_connections.count(), remaining_established - 1);
|
||||
} else {
|
||||
assert_eq!(remaining_established, 0)
|
||||
}
|
||||
assert!(
|
||||
self.inject_connection_established.contains(&connection),
|
||||
self.inject_connection_established
|
||||
.iter()
|
||||
.any(|(peer, conn_id, endpoint, _)| (peer, conn_id, endpoint) == (p, c, e)),
|
||||
"`inject_connection_closed` is called only for connections for \
|
||||
which `inject_connection_established` was called first."
|
||||
);
|
||||
self.inject_connection_closed.push(connection);
|
||||
self.inner.inject_connection_closed(p, c, e, handler);
|
||||
self.inject_connection_closed
|
||||
.push((*p, *c, e.clone(), remaining_established));
|
||||
self.inner
|
||||
.inject_connection_closed(p, c, e, handler, remaining_established);
|
||||
}
|
||||
|
||||
fn inject_event(
|
||||
@@ -278,14 +325,14 @@ where
|
||||
assert!(
|
||||
self.inject_connection_established
|
||||
.iter()
|
||||
.any(|(peer_id, conn_id, _)| *peer_id == p && c == *conn_id),
|
||||
.any(|(peer_id, conn_id, ..)| *peer_id == p && c == *conn_id),
|
||||
"`inject_event` is called for reported connections."
|
||||
);
|
||||
assert!(
|
||||
!self
|
||||
.inject_connection_closed
|
||||
.iter()
|
||||
.any(|(peer_id, conn_id, _)| *peer_id == p && c == *conn_id),
|
||||
.any(|(peer_id, conn_id, ..)| *peer_id == p && c == *conn_id),
|
||||
"`inject_event` is never called for closed connections."
|
||||
);
|
||||
|
||||
|
||||
Reference in New Issue
Block a user