upgrade libp2p to v0.53.* (#4935)

* update libp2p and address compiler errors

* remove bandwidth logging from transport

* use libp2p registry

* make clippy happy

* use rust 1.73

* correct rpc keep alive

* remove comments and obsolte code

* remove libp2p prefix

* make clippy happy

* use quic under facade

* remove fast msg id

* bubble up close statements

* fix wrong comment
This commit is contained in:
Divma
2023-12-07 04:39:59 -05:00
committed by GitHub
parent 67e0569d9b
commit 6c0c41c7ac
17 changed files with 527 additions and 606 deletions

529
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -1,4 +1,4 @@
FROM rust:1.69.0-bullseye AS builder FROM rust:1.73.0-bullseye AS builder
RUN apt-get update && apt-get -y upgrade && apt-get install -y cmake libclang-dev RUN apt-get update && apt-get -y upgrade && apt-get install -y cmake libclang-dev
COPY . lighthouse COPY . lighthouse
ARG FEATURES ARG FEATURES

View File

@@ -68,7 +68,7 @@ pub struct ClientBuilder<T: BeaconChainTypes> {
eth1_service: Option<Eth1Service>, eth1_service: Option<Eth1Service>,
network_globals: Option<Arc<NetworkGlobals<T::EthSpec>>>, network_globals: Option<Arc<NetworkGlobals<T::EthSpec>>>,
network_senders: Option<NetworkSenders<T::EthSpec>>, network_senders: Option<NetworkSenders<T::EthSpec>>,
gossipsub_registry: Option<Registry>, libp2p_registry: Option<Registry>,
db_path: Option<PathBuf>, db_path: Option<PathBuf>,
freezer_db_path: Option<PathBuf>, freezer_db_path: Option<PathBuf>,
http_api_config: http_api::Config, http_api_config: http_api::Config,
@@ -102,7 +102,7 @@ where
eth1_service: None, eth1_service: None,
network_globals: None, network_globals: None,
network_senders: None, network_senders: None,
gossipsub_registry: None, libp2p_registry: None,
db_path: None, db_path: None,
freezer_db_path: None, freezer_db_path: None,
http_api_config: <_>::default(), http_api_config: <_>::default(),
@@ -531,7 +531,7 @@ where
.ok_or("network requires beacon_processor_channels")?; .ok_or("network requires beacon_processor_channels")?;
// If gossipsub metrics are required we build a registry to record them // If gossipsub metrics are required we build a registry to record them
let mut gossipsub_registry = if config.metrics_enabled { let mut libp2p_registry = if config.metrics_enabled {
Some(Registry::default()) Some(Registry::default())
} else { } else {
None None
@@ -541,9 +541,7 @@ where
beacon_chain, beacon_chain,
config, config,
context.executor, context.executor,
gossipsub_registry libp2p_registry.as_mut(),
.as_mut()
.map(|registry| registry.sub_registry_with_prefix("gossipsub")),
beacon_processor_channels.beacon_processor_tx.clone(), beacon_processor_channels.beacon_processor_tx.clone(),
beacon_processor_channels.work_reprocessing_tx.clone(), beacon_processor_channels.work_reprocessing_tx.clone(),
) )
@@ -552,7 +550,7 @@ where
self.network_globals = Some(network_globals); self.network_globals = Some(network_globals);
self.network_senders = Some(network_senders); self.network_senders = Some(network_senders);
self.gossipsub_registry = gossipsub_registry; self.libp2p_registry = libp2p_registry;
Ok(self) Ok(self)
} }
@@ -718,7 +716,7 @@ where
chain: self.beacon_chain.clone(), chain: self.beacon_chain.clone(),
db_path: self.db_path.clone(), db_path: self.db_path.clone(),
freezer_db_path: self.freezer_db_path.clone(), freezer_db_path: self.freezer_db_path.clone(),
gossipsub_registry: self.gossipsub_registry.take().map(std::sync::Mutex::new), gossipsub_registry: self.libp2p_registry.take().map(std::sync::Mutex::new),
log: log.clone(), log: log.clone(),
}); });

View File

@@ -39,17 +39,16 @@ directory = { workspace = true }
regex = { workspace = true } regex = { workspace = true }
strum = { workspace = true } strum = { workspace = true }
superstruct = { workspace = true } superstruct = { workspace = true }
prometheus-client = "0.21.0" prometheus-client = "0.22.0"
unused_port = { workspace = true } unused_port = { workspace = true }
delay_map = { workspace = true } delay_map = { workspace = true }
void = "1" void = "1"
libp2p-quic= { version = "0.9.2", features=["tokio"]} libp2p-mplex = "0.41.0"
libp2p-mplex = "0.40.0"
[dependencies.libp2p] [dependencies.libp2p]
version = "0.52" version = "0.53"
default-features = false default-features = false
features = ["identify", "yamux", "noise", "gossipsub", "dns", "tcp", "tokio", "plaintext", "secp256k1", "macros", "ecdsa"] features = ["identify", "yamux", "noise", "gossipsub", "dns", "tcp", "tokio", "plaintext", "secp256k1", "macros", "ecdsa", "metrics", "quic"]
[dev-dependencies] [dev-dependencies]
slog-term = { workspace = true } slog-term = { workspace = true }

View File

@@ -455,12 +455,6 @@ pub fn gossipsub_config(
fork_context: Arc<ForkContext>, fork_context: Arc<ForkContext>,
gossipsub_config_params: GossipsubConfigParams, gossipsub_config_params: GossipsubConfigParams,
) -> gossipsub::Config { ) -> gossipsub::Config {
// The function used to generate a gossipsub message id
// We use the first 8 bytes of SHA256(topic, data) for content addressing
let fast_gossip_message_id = |message: &gossipsub::RawMessage| {
let data = [message.topic.as_str().as_bytes(), &message.data].concat();
gossipsub::FastMessageId::from(&Sha256::digest(&data)[..8])
};
fn prefix( fn prefix(
prefix: [u8; 4], prefix: [u8; 4],
message: &gossipsub::Message, message: &gossipsub::Message,
@@ -518,7 +512,6 @@ pub fn gossipsub_config(
.validation_mode(gossipsub::ValidationMode::Anonymous) .validation_mode(gossipsub::ValidationMode::Anonymous)
.duplicate_cache_time(DUPLICATE_CACHE_TIME) .duplicate_cache_time(DUPLICATE_CACHE_TIME)
.message_id_fn(gossip_message_id) .message_id_fn(gossip_message_id)
.fast_message_id_fn(fast_gossip_message_id)
.allow_self_origin(true) .allow_self_origin(true)
.build() .build()
.expect("valid gossipsub configuration") .expect("valid gossipsub configuration")

View File

@@ -29,7 +29,7 @@ pub use libp2p::{
identity::PeerId, identity::PeerId,
swarm::{ swarm::{
dummy::ConnectionHandler, ConnectionId, DialError, NetworkBehaviour, NotifyHandler, dummy::ConnectionHandler, ConnectionId, DialError, NetworkBehaviour, NotifyHandler,
PollParameters, SubstreamProtocol, ToSwarm, SubstreamProtocol, ToSwarm,
}, },
}; };
use lru::LruCache; use lru::LruCache;
@@ -955,11 +955,7 @@ impl<TSpec: EthSpec> NetworkBehaviour for Discovery<TSpec> {
} }
// Main execution loop to drive the behaviour // Main execution loop to drive the behaviour
fn poll( fn poll(&mut self, cx: &mut Context) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
&mut self,
cx: &mut Context,
_: &mut impl PollParameters,
) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
if !self.started { if !self.started {
return Poll::Pending; return Poll::Pending;
} }
@@ -1041,7 +1037,7 @@ impl<TSpec: EthSpec> NetworkBehaviour for Discovery<TSpec> {
Poll::Pending Poll::Pending
} }
fn on_swarm_event(&mut self, event: FromSwarm<Self::ConnectionHandler>) { fn on_swarm_event(&mut self, event: FromSwarm) {
match event { match event {
FromSwarm::DialFailure(DialFailure { peer_id, error, .. }) => { FromSwarm::DialFailure(DialFailure { peer_id, error, .. }) => {
self.on_dial_failure(peer_id, error) self.on_dial_failure(peer_id, error)
@@ -1114,17 +1110,7 @@ impl<TSpec: EthSpec> NetworkBehaviour for Discovery<TSpec> {
Err(e) => warn!(self.log, "Failed to update ENR"; "error" => ?e), Err(e) => warn!(self.log, "Failed to update ENR"; "error" => ?e),
} }
} }
FromSwarm::ConnectionEstablished(_) _ => {
| FromSwarm::ConnectionClosed(_)
| FromSwarm::AddressChange(_)
| FromSwarm::ListenFailure(_)
| FromSwarm::NewListener(_)
| FromSwarm::ExpiredListenAddr(_)
| FromSwarm::ListenerError(_)
| FromSwarm::ListenerClosed(_)
| FromSwarm::NewExternalAddrCandidate(_)
| FromSwarm::ExternalAddrExpired(_)
| FromSwarm::ExternalAddrConfirmed(_) => {
// Ignore events not relevant to discovery // Ignore events not relevant to discovery
} }
} }

View File

@@ -115,7 +115,6 @@ pub use config::Config as NetworkConfig;
pub use discovery::{CombinedKeyExt, EnrExt, Eth2Enr}; pub use discovery::{CombinedKeyExt, EnrExt, Eth2Enr};
pub use discv5; pub use discv5;
pub use libp2p; pub use libp2p;
pub use libp2p::bandwidth::BandwidthSinks;
pub use libp2p::gossipsub::{IdentTopic, MessageAcceptance, MessageId, Topic, TopicHash}; pub use libp2p::gossipsub::{IdentTopic, MessageAcceptance, MessageId, Topic, TopicHash};
pub use libp2p::{core::ConnectedPoint, PeerId, Swarm}; pub use libp2p::{core::ConnectedPoint, PeerId, Swarm};
pub use libp2p::{multiaddr, Multiaddr}; pub use libp2p::{multiaddr, Multiaddr};

View File

@@ -1,6 +1,3 @@
use libp2p::bandwidth::BandwidthSinks;
use std::sync::Arc;
pub use lighthouse_metrics::*; pub use lighthouse_metrics::*;
lazy_static! { lazy_static! {
@@ -187,46 +184,3 @@ pub fn scrape_discovery_metrics() {
set_gauge(&DISCOVERY_SENT_BYTES, metrics.bytes_sent as i64); set_gauge(&DISCOVERY_SENT_BYTES, metrics.bytes_sent as i64);
set_gauge(&DISCOVERY_RECV_BYTES, metrics.bytes_recv as i64); set_gauge(&DISCOVERY_RECV_BYTES, metrics.bytes_recv as i64);
} }
/// Aggregated `BandwidthSinks` of tcp and quic transports
/// used in libp2p.
pub struct AggregatedBandwidthSinks {
tcp_sinks: Arc<BandwidthSinks>,
quic_sinks: Option<Arc<BandwidthSinks>>,
}
impl AggregatedBandwidthSinks {
/// Create a new `AggregatedBandwidthSinks`.
pub fn new(tcp_sinks: Arc<BandwidthSinks>, quic_sinks: Option<Arc<BandwidthSinks>>) -> Self {
AggregatedBandwidthSinks {
tcp_sinks,
quic_sinks,
}
}
/// Total QUIC inbound bandwidth.
pub fn total_quic_inbound(&self) -> u64 {
self.quic_sinks
.as_ref()
.map(|q| q.total_inbound())
.unwrap_or_default()
}
/// Total TCP inbound bandwidth.
pub fn total_tcp_inbound(&self) -> u64 {
self.tcp_sinks.total_inbound()
}
/// Total QUIC outbound bandwidth.
pub fn total_quic_outbound(&self) -> u64 {
self.quic_sinks
.as_ref()
.map(|q| q.total_outbound())
.unwrap_or_default()
}
/// Total TCP outbound bandwidth.
pub fn total_tcp_outbound(&self) -> u64 {
self.tcp_sinks.total_outbound()
}
}

View File

@@ -9,7 +9,7 @@ use libp2p::identity::PeerId;
use libp2p::swarm::behaviour::{ConnectionClosed, ConnectionEstablished, DialFailure, FromSwarm}; use libp2p::swarm::behaviour::{ConnectionClosed, ConnectionEstablished, DialFailure, FromSwarm};
use libp2p::swarm::dial_opts::{DialOpts, PeerCondition}; use libp2p::swarm::dial_opts::{DialOpts, PeerCondition};
use libp2p::swarm::dummy::ConnectionHandler; use libp2p::swarm::dummy::ConnectionHandler;
use libp2p::swarm::{ConnectionDenied, ConnectionId, NetworkBehaviour, PollParameters, ToSwarm}; use libp2p::swarm::{ConnectionDenied, ConnectionId, NetworkBehaviour, ToSwarm};
use slog::{debug, error, trace}; use slog::{debug, error, trace};
use types::EthSpec; use types::EthSpec;
@@ -36,11 +36,7 @@ impl<TSpec: EthSpec> NetworkBehaviour for PeerManager<TSpec> {
// no events from the dummy handler // no events from the dummy handler
} }
fn poll( fn poll(&mut self, cx: &mut Context<'_>) -> Poll<ToSwarm<Self::ToSwarm, void::Void>> {
&mut self,
cx: &mut Context<'_>,
_params: &mut impl PollParameters,
) -> Poll<ToSwarm<Self::ToSwarm, void::Void>> {
// perform the heartbeat when necessary // perform the heartbeat when necessary
while self.heartbeat.poll_tick(cx).is_ready() { while self.heartbeat.poll_tick(cx).is_ready() {
self.heartbeat(); self.heartbeat();
@@ -121,7 +117,7 @@ impl<TSpec: EthSpec> NetworkBehaviour for PeerManager<TSpec> {
Poll::Pending Poll::Pending
} }
fn on_swarm_event(&mut self, event: FromSwarm<Self::ConnectionHandler>) { fn on_swarm_event(&mut self, event: FromSwarm) {
match event { match event {
FromSwarm::ConnectionEstablished(ConnectionEstablished { FromSwarm::ConnectionEstablished(ConnectionEstablished {
peer_id, peer_id,
@@ -155,15 +151,9 @@ impl<TSpec: EthSpec> NetworkBehaviour for PeerManager<TSpec> {
// TODO: we likely want to check this against our assumed external tcp // TODO: we likely want to check this against our assumed external tcp
// address // address
} }
FromSwarm::AddressChange(_) _ => {
| FromSwarm::ListenFailure(_) // NOTE: FromSwarm is a non exhaustive enum so updates should be based on release
| FromSwarm::NewListener(_) // notes more than compiler feedback
| FromSwarm::NewListenAddr(_)
| FromSwarm::ExpiredListenAddr(_)
| FromSwarm::ListenerError(_)
| FromSwarm::ListenerClosed(_)
| FromSwarm::NewExternalAddrCandidate(_)
| FromSwarm::ExternalAddrExpired(_) => {
// The rest of the events we ignore since they are handled in their associated // The rest of the events we ignore since they are handled in their associated
// `SwarmEvent` // `SwarmEvent`
} }

View File

@@ -12,8 +12,7 @@ use futures::prelude::*;
use futures::{Sink, SinkExt}; use futures::{Sink, SinkExt};
use libp2p::swarm::handler::{ use libp2p::swarm::handler::{
ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent, DialUpgradeError, ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent, DialUpgradeError,
FullyNegotiatedInbound, FullyNegotiatedOutbound, KeepAlive, StreamUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, StreamUpgradeError, SubstreamProtocol,
SubstreamProtocol,
}; };
use libp2p::swarm::Stream; use libp2p::swarm::Stream;
use slog::{crit, debug, trace, warn}; use slog::{crit, debug, trace, warn};
@@ -51,7 +50,12 @@ impl SubstreamId {
type InboundSubstream<TSpec> = InboundFramed<Stream, TSpec>; type InboundSubstream<TSpec> = InboundFramed<Stream, TSpec>;
/// Events the handler emits to the behaviour. /// Events the handler emits to the behaviour.
pub type HandlerEvent<Id, T> = Result<RPCReceived<Id, T>, HandlerErr<Id>>; #[derive(Debug)]
pub enum HandlerEvent<Id, T: EthSpec> {
Ok(RPCReceived<Id, T>),
Err(HandlerErr<Id>),
Close(RPCError),
}
/// An error encountered by the handler. /// An error encountered by the handler.
#[derive(Debug)] #[derive(Debug)]
@@ -249,11 +253,12 @@ where
} }
// We now drive to completion communications already dialed/established // We now drive to completion communications already dialed/established
while let Some((id, req)) = self.dial_queue.pop() { while let Some((id, req)) = self.dial_queue.pop() {
self.events_out.push(Err(HandlerErr::Outbound { self.events_out
error: RPCError::Disconnected, .push(HandlerEvent::Err(HandlerErr::Outbound {
proto: req.versioned_protocol().protocol(), error: RPCError::Disconnected,
id, proto: req.versioned_protocol().protocol(),
})); id,
}));
} }
// Queue our goodbye message. // Queue our goodbye message.
@@ -273,11 +278,13 @@ where
HandlerState::Active => { HandlerState::Active => {
self.dial_queue.push((id, req)); self.dial_queue.push((id, req));
} }
_ => self.events_out.push(Err(HandlerErr::Outbound { _ => self
error: RPCError::Disconnected, .events_out
proto: req.versioned_protocol().protocol(), .push(HandlerEvent::Err(HandlerErr::Outbound {
id, error: RPCError::Disconnected,
})), proto: req.versioned_protocol().protocol(),
id,
})),
} }
} }
@@ -296,7 +303,7 @@ where
}; };
// If the response we are sending is an error, report back for handling // If the response we are sending is an error, report back for handling
if let RPCCodedResponse::Error(ref code, ref reason) = response { if let RPCCodedResponse::Error(ref code, ref reason) = response {
self.events_out.push(Err(HandlerErr::Inbound { self.events_out.push(HandlerEvent::Err(HandlerErr::Inbound {
error: RPCError::ErrorResponse(*code, reason.to_string()), error: RPCError::ErrorResponse(*code, reason.to_string()),
proto: inbound_info.protocol, proto: inbound_info.protocol,
id: inbound_id, id: inbound_id,
@@ -320,7 +327,6 @@ where
{ {
type FromBehaviour = RPCSend<Id, TSpec>; type FromBehaviour = RPCSend<Id, TSpec>;
type ToBehaviour = HandlerEvent<Id, TSpec>; type ToBehaviour = HandlerEvent<Id, TSpec>;
type Error = RPCError;
type InboundProtocol = RPCProtocol<TSpec>; type InboundProtocol = RPCProtocol<TSpec>;
type OutboundProtocol = OutboundRequestContainer<TSpec>; type OutboundProtocol = OutboundRequestContainer<TSpec>;
type OutboundOpenInfo = (Id, OutboundRequest<TSpec>); // Keep track of the id and the request type OutboundOpenInfo = (Id, OutboundRequest<TSpec>); // Keep track of the id and the request
@@ -342,28 +348,23 @@ where
} }
} }
fn connection_keep_alive(&self) -> KeepAlive { fn connection_keep_alive(&self) -> bool {
// Check that we don't have outbound items pending for dialing, nor dialing, nor // Check that we don't have outbound items pending for dialing, nor dialing, nor
// established. Also check that there are no established inbound substreams. // established. Also check that there are no established inbound substreams.
// Errors and events need to be reported back, so check those too. // Errors and events need to be reported back, so check those too.
let should_shutdown = match self.state { match self.state {
HandlerState::ShuttingDown(_) => { HandlerState::ShuttingDown(_) => {
self.dial_queue.is_empty() !self.dial_queue.is_empty()
&& self.outbound_substreams.is_empty() || !self.outbound_substreams.is_empty()
&& self.inbound_substreams.is_empty() || !self.inbound_substreams.is_empty()
&& self.events_out.is_empty() || !self.events_out.is_empty()
&& self.dial_negotiated == 0 || !self.dial_negotiated != 0
} }
HandlerState::Deactivated => { HandlerState::Deactivated => {
// Regardless of events, the timeout has expired. Force the disconnect. // Regardless of events, the timeout has expired. Force the disconnect.
true false
} }
_ => false, _ => true,
};
if should_shutdown {
KeepAlive::No
} else {
KeepAlive::Yes
} }
} }
@@ -371,12 +372,7 @@ where
&mut self, &mut self,
cx: &mut Context<'_>, cx: &mut Context<'_>,
) -> Poll< ) -> Poll<
ConnectionHandlerEvent< ConnectionHandlerEvent<Self::OutboundProtocol, Self::OutboundOpenInfo, Self::ToBehaviour>,
Self::OutboundProtocol,
Self::OutboundOpenInfo,
Self::ToBehaviour,
Self::Error,
>,
> { > {
if let Some(waker) = &self.waker { if let Some(waker) = &self.waker {
if waker.will_wake(cx.waker()) { if waker.will_wake(cx.waker()) {
@@ -400,7 +396,9 @@ where
Poll::Ready(_) => { Poll::Ready(_) => {
self.state = HandlerState::Deactivated; self.state = HandlerState::Deactivated;
debug!(self.log, "Handler deactivated"); debug!(self.log, "Handler deactivated");
return Poll::Ready(ConnectionHandlerEvent::Close(RPCError::Disconnected)); return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
HandlerEvent::Close(RPCError::Disconnected),
));
} }
Poll::Pending => {} Poll::Pending => {}
}; };
@@ -414,7 +412,7 @@ where
if let Some(info) = self.inbound_substreams.get_mut(inbound_id.get_ref()) { if let Some(info) = self.inbound_substreams.get_mut(inbound_id.get_ref()) {
// the delay has been removed // the delay has been removed
info.delay_key = None; info.delay_key = None;
self.events_out.push(Err(HandlerErr::Inbound { self.events_out.push(HandlerEvent::Err(HandlerErr::Inbound {
error: RPCError::StreamTimeout, error: RPCError::StreamTimeout,
proto: info.protocol, proto: info.protocol,
id: *inbound_id.get_ref(), id: *inbound_id.get_ref(),
@@ -432,9 +430,11 @@ where
Poll::Ready(Some(Err(e))) => { Poll::Ready(Some(Err(e))) => {
warn!(self.log, "Inbound substream poll failed"; "error" => ?e); warn!(self.log, "Inbound substream poll failed"; "error" => ?e);
// drops the peer if we cannot read the delay queue // drops the peer if we cannot read the delay queue
return Poll::Ready(ConnectionHandlerEvent::Close(RPCError::InternalError( return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
"Could not poll inbound stream timer", HandlerEvent::Close(RPCError::InternalError(
))); "Could not poll inbound stream timer",
)),
));
} }
Poll::Pending | Poll::Ready(None) => break, Poll::Pending | Poll::Ready(None) => break,
} }
@@ -453,18 +453,20 @@ where
error: RPCError::StreamTimeout, error: RPCError::StreamTimeout,
}; };
// notify the user // notify the user
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(Err( return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
outbound_err, HandlerEvent::Err(outbound_err),
))); ));
} else { } else {
crit!(self.log, "timed out substream not in the books"; "stream_id" => outbound_id.get_ref()); crit!(self.log, "timed out substream not in the books"; "stream_id" => outbound_id.get_ref());
} }
} }
Poll::Ready(Some(Err(e))) => { Poll::Ready(Some(Err(e))) => {
warn!(self.log, "Outbound substream poll failed"; "error" => ?e); warn!(self.log, "Outbound substream poll failed"; "error" => ?e);
return Poll::Ready(ConnectionHandlerEvent::Close(RPCError::InternalError( return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
"Could not poll outbound stream timer", HandlerEvent::Close(RPCError::InternalError(
))); "Could not poll outbound stream timer",
)),
));
} }
Poll::Pending | Poll::Ready(None) => break, Poll::Pending | Poll::Ready(None) => break,
} }
@@ -516,7 +518,7 @@ where
// If there was an error in shutting down the substream report the // If there was an error in shutting down the substream report the
// error // error
if let Err(error) = res { if let Err(error) = res {
self.events_out.push(Err(HandlerErr::Inbound { self.events_out.push(HandlerEvent::Err(HandlerErr::Inbound {
error, error,
proto: info.protocol, proto: info.protocol,
id: *id, id: *id,
@@ -528,7 +530,7 @@ where
if info.pending_items.back().map(|l| l.close_after()) == Some(false) if info.pending_items.back().map(|l| l.close_after()) == Some(false)
{ {
// if the request was still active, report back to cancel it // if the request was still active, report back to cancel it
self.events_out.push(Err(HandlerErr::Inbound { self.events_out.push(HandlerEvent::Err(HandlerErr::Inbound {
error: RPCError::Disconnected, error: RPCError::Disconnected,
proto: info.protocol, proto: info.protocol,
id: *id, id: *id,
@@ -613,7 +615,7 @@ where
self.inbound_substreams_delay.remove(delay_key); self.inbound_substreams_delay.remove(delay_key);
} }
// Report the error that occurred during the send process // Report the error that occurred during the send process
self.events_out.push(Err(HandlerErr::Inbound { self.events_out.push(HandlerEvent::Err(HandlerErr::Inbound {
error, error,
proto: info.protocol, proto: info.protocol,
id: *id, id: *id,
@@ -666,11 +668,12 @@ where
} if deactivated => { } if deactivated => {
// the handler is deactivated. Close the stream // the handler is deactivated. Close the stream
entry.get_mut().state = OutboundSubstreamState::Closing(substream); entry.get_mut().state = OutboundSubstreamState::Closing(substream);
self.events_out.push(Err(HandlerErr::Outbound { self.events_out
error: RPCError::Disconnected, .push(HandlerEvent::Err(HandlerErr::Outbound {
proto: entry.get().proto, error: RPCError::Disconnected,
id: entry.get().req_id, proto: entry.get().proto,
})) id: entry.get().req_id,
}))
} }
OutboundSubstreamState::RequestPendingResponse { OutboundSubstreamState::RequestPendingResponse {
mut substream, mut substream,
@@ -711,14 +714,18 @@ where
let received = match response { let received = match response {
RPCCodedResponse::StreamTermination(t) => { RPCCodedResponse::StreamTermination(t) => {
Ok(RPCReceived::EndOfStream(id, t)) HandlerEvent::Ok(RPCReceived::EndOfStream(id, t))
}
RPCCodedResponse::Success(resp) => {
HandlerEvent::Ok(RPCReceived::Response(id, resp))
}
RPCCodedResponse::Error(ref code, ref r) => {
HandlerEvent::Err(HandlerErr::Outbound {
id,
proto,
error: RPCError::ErrorResponse(*code, r.to_string()),
})
} }
RPCCodedResponse::Success(resp) => Ok(RPCReceived::Response(id, resp)),
RPCCodedResponse::Error(ref code, ref r) => Err(HandlerErr::Outbound {
id,
proto,
error: RPCError::ErrorResponse(*code, r.to_string()),
}),
}; };
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(received)); return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(received));
@@ -736,9 +743,12 @@ where
// notify the application error // notify the application error
if request.expected_responses() > 1 { if request.expected_responses() > 1 {
// return an end of stream result // return an end of stream result
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(Ok( return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
RPCReceived::EndOfStream(request_id, request.stream_termination()), HandlerEvent::Ok(RPCReceived::EndOfStream(
))); request_id,
request.stream_termination(),
)),
));
} }
// else we return an error, stream should not have closed early. // else we return an error, stream should not have closed early.
@@ -747,9 +757,9 @@ where
proto: request.versioned_protocol().protocol(), proto: request.versioned_protocol().protocol(),
error: RPCError::IncompleteStream, error: RPCError::IncompleteStream,
}; };
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(Err( return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
outbound_err, HandlerEvent::Err(outbound_err),
))); ));
} }
Poll::Pending => { Poll::Pending => {
entry.get_mut().state = entry.get_mut().state =
@@ -765,9 +775,9 @@ where
error: e, error: e,
}; };
entry.remove_entry(); entry.remove_entry();
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(Err( return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
outbound_err, HandlerEvent::Err(outbound_err),
))); ));
} }
}, },
OutboundSubstreamState::Closing(mut substream) => { OutboundSubstreamState::Closing(mut substream) => {
@@ -788,9 +798,12 @@ where
// termination to the application // termination to the application
if let Some(termination) = protocol.terminator() { if let Some(termination) = protocol.terminator() {
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(Ok( return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
RPCReceived::EndOfStream(request_id, termination), HandlerEvent::Ok(RPCReceived::EndOfStream(
))); request_id,
termination,
)),
));
} }
} }
Poll::Pending => { Poll::Pending => {
@@ -831,7 +844,9 @@ where
&& self.events_out.is_empty() && self.events_out.is_empty()
&& self.dial_negotiated == 0 && self.dial_negotiated == 0
{ {
return Poll::Ready(ConnectionHandlerEvent::Close(RPCError::Disconnected)); return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
HandlerEvent::Close(RPCError::Disconnected),
));
} }
} }
@@ -859,24 +874,9 @@ where
ConnectionEvent::DialUpgradeError(DialUpgradeError { info, error }) => { ConnectionEvent::DialUpgradeError(DialUpgradeError { info, error }) => {
self.on_dial_upgrade_error(info, error) self.on_dial_upgrade_error(info, error)
} }
ConnectionEvent::ListenUpgradeError(libp2p::swarm::handler::ListenUpgradeError { _ => {
info: _, // NOTE: ConnectionEvent is a non exhaustive enum so updates should be based on
error: _, /* RPCError */ // release notes more than compiler feedback
}) => {
// This is going to be removed in the next libp2p release. I think its fine to do
// nothing.
}
ConnectionEvent::LocalProtocolsChange(_) => {
// This shouldn't effect this handler, we will still negotiate streams if we support
// the protocol as usual.
}
ConnectionEvent::RemoteProtocolsChange(_) => {
// This shouldn't effect this handler, we will still negotiate streams if we support
// the protocol as usual.
}
ConnectionEvent::AddressChange(_) => {
// We dont care about these changes as they have no bearing on our RPC internal
// logic.
} }
} }
} }
@@ -919,7 +919,7 @@ where
}, },
); );
} else { } else {
self.events_out.push(Err(HandlerErr::Inbound { self.events_out.push(HandlerEvent::Err(HandlerErr::Inbound {
id: self.current_inbound_substream_id, id: self.current_inbound_substream_id,
proto: req.versioned_protocol().protocol(), proto: req.versioned_protocol().protocol(),
error: RPCError::HandlerRejected, error: RPCError::HandlerRejected,
@@ -933,7 +933,7 @@ where
self.shutdown(None); self.shutdown(None);
} }
self.events_out.push(Ok(RPCReceived::Request( self.events_out.push(HandlerEvent::Ok(RPCReceived::Request(
self.current_inbound_substream_id, self.current_inbound_substream_id,
req, req,
))); )));
@@ -953,11 +953,12 @@ where
// accept outbound connections only if the handler is not deactivated // accept outbound connections only if the handler is not deactivated
if matches!(self.state, HandlerState::Deactivated) { if matches!(self.state, HandlerState::Deactivated) {
self.events_out.push(Err(HandlerErr::Outbound { self.events_out
error: RPCError::Disconnected, .push(HandlerEvent::Err(HandlerErr::Outbound {
proto, error: RPCError::Disconnected,
id, proto,
})); id,
}));
} }
// add the stream to substreams if we expect a response, otherwise drop the stream. // add the stream to substreams if we expect a response, otherwise drop the stream.
@@ -1030,11 +1031,12 @@ where
self.dial_negotiated -= 1; self.dial_negotiated -= 1;
self.outbound_io_error_retries = 0; self.outbound_io_error_retries = 0;
self.events_out.push(Err(HandlerErr::Outbound { self.events_out
error, .push(HandlerEvent::Err(HandlerErr::Outbound {
proto: req.versioned_protocol().protocol(), error,
id, proto: req.versioned_protocol().protocol(),
})); id,
}));
} }
} }

View File

@@ -5,10 +5,9 @@
//! syncing. //! syncing.
use futures::future::FutureExt; use futures::future::FutureExt;
use handler::{HandlerEvent, RPCHandler}; use handler::RPCHandler;
use libp2p::swarm::{ use libp2p::swarm::{
handler::ConnectionHandler, ConnectionId, NetworkBehaviour, NotifyHandler, PollParameters, handler::ConnectionHandler, ConnectionId, NetworkBehaviour, NotifyHandler, ToSwarm,
ToSwarm,
}; };
use libp2p::swarm::{FromSwarm, SubstreamProtocol, THandlerInEvent}; use libp2p::swarm::{FromSwarm, SubstreamProtocol, THandlerInEvent};
use libp2p::PeerId; use libp2p::PeerId;
@@ -20,7 +19,7 @@ use std::task::{Context, Poll};
use std::time::Duration; use std::time::Duration;
use types::{EthSpec, ForkContext}; use types::{EthSpec, ForkContext};
pub(crate) use handler::HandlerErr; pub(crate) use handler::{HandlerErr, HandlerEvent};
pub(crate) use methods::{MetaData, MetaDataV1, MetaDataV2, Ping, RPCCodedResponse, RPCResponse}; pub(crate) use methods::{MetaData, MetaDataV1, MetaDataV2, Ping, RPCCodedResponse, RPCResponse};
pub(crate) use protocol::InboundRequest; pub(crate) use protocol::InboundRequest;
@@ -282,25 +281,9 @@ where
Ok(handler) Ok(handler)
} }
fn on_swarm_event(&mut self, event: FromSwarm<Self::ConnectionHandler>) { fn on_swarm_event(&mut self, _event: FromSwarm) {
match event { // NOTE: FromSwarm is a non exhaustive enum so updates should be based on release notes more
FromSwarm::ConnectionClosed(_) // than compiler feedback
| FromSwarm::ConnectionEstablished(_)
| FromSwarm::AddressChange(_)
| FromSwarm::DialFailure(_)
| FromSwarm::ListenFailure(_)
| FromSwarm::NewListener(_)
| FromSwarm::NewListenAddr(_)
| FromSwarm::ExpiredListenAddr(_)
| FromSwarm::ListenerError(_)
| FromSwarm::ListenerClosed(_)
| FromSwarm::NewExternalAddrCandidate(_)
| FromSwarm::ExternalAddrExpired(_)
| FromSwarm::ExternalAddrConfirmed(_) => {
// Rpc Behaviour does not act on these swarm events. We use a comprehensive match
// statement to ensure future events are dealt with appropriately.
}
}
} }
fn on_connection_handler_event( fn on_connection_handler_event(
@@ -309,7 +292,7 @@ where
conn_id: ConnectionId, conn_id: ConnectionId,
event: <Self::ConnectionHandler as ConnectionHandler>::ToBehaviour, event: <Self::ConnectionHandler as ConnectionHandler>::ToBehaviour,
) { ) {
if let Ok(RPCReceived::Request(ref id, ref req)) = event { if let HandlerEvent::Ok(RPCReceived::Request(ref id, ref req)) = event {
if let Some(limiter) = self.limiter.as_mut() { if let Some(limiter) = self.limiter.as_mut() {
// check if the request is conformant to the quota // check if the request is conformant to the quota
match limiter.allows(&peer_id, req) { match limiter.allows(&peer_id, req) {
@@ -374,11 +357,7 @@ where
} }
} }
fn poll( fn poll(&mut self, cx: &mut Context) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
&mut self,
cx: &mut Context,
_: &mut impl PollParameters,
) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
// let the rate limiter prune. // let the rate limiter prune.
if let Some(limiter) = self.limiter.as_mut() { if let Some(limiter) = self.limiter.as_mut() {
let _ = limiter.poll_unpin(cx); let _ = limiter.poll_unpin(cx);
@@ -409,27 +388,38 @@ where
serializer: &mut dyn slog::Serializer, serializer: &mut dyn slog::Serializer,
) -> slog::Result { ) -> slog::Result {
serializer.emit_arguments("peer_id", &format_args!("{}", self.peer_id))?; serializer.emit_arguments("peer_id", &format_args!("{}", self.peer_id))?;
let (msg_kind, protocol) = match &self.event { match &self.event {
Ok(received) => match received { HandlerEvent::Ok(received) => {
RPCReceived::Request(_, req) => ("request", req.versioned_protocol().protocol()), let (msg_kind, protocol) = match received {
RPCReceived::Response(_, res) => ("response", res.protocol()), RPCReceived::Request(_, req) => {
RPCReceived::EndOfStream(_, end) => ( ("request", req.versioned_protocol().protocol())
"end_of_stream", }
match end { RPCReceived::Response(_, res) => ("response", res.protocol()),
ResponseTermination::BlocksByRange => Protocol::BlocksByRange, RPCReceived::EndOfStream(_, end) => (
ResponseTermination::BlocksByRoot => Protocol::BlocksByRoot, "end_of_stream",
ResponseTermination::BlobsByRange => Protocol::BlobsByRange, match end {
ResponseTermination::BlobsByRoot => Protocol::BlobsByRoot, ResponseTermination::BlocksByRange => Protocol::BlocksByRange,
}, ResponseTermination::BlocksByRoot => Protocol::BlocksByRoot,
), ResponseTermination::BlobsByRange => Protocol::BlobsByRange,
}, ResponseTermination::BlobsByRoot => Protocol::BlobsByRoot,
Err(error) => match &error { },
HandlerErr::Inbound { proto, .. } => ("inbound_err", *proto), ),
HandlerErr::Outbound { proto, .. } => ("outbound_err", *proto), };
}, serializer.emit_str("msg_kind", msg_kind)?;
serializer.emit_arguments("protocol", &format_args!("{}", protocol))?;
}
HandlerEvent::Err(error) => {
let (msg_kind, protocol) = match &error {
HandlerErr::Inbound { proto, .. } => ("inbound_err", *proto),
HandlerErr::Outbound { proto, .. } => ("outbound_err", *proto),
};
serializer.emit_str("msg_kind", msg_kind)?;
serializer.emit_arguments("protocol", &format_args!("{}", protocol))?;
}
HandlerEvent::Close(err) => {
serializer.emit_arguments("handler_close", &format_args!("{}", err))?;
}
}; };
serializer.emit_str("msg_kind", msg_kind)?;
serializer.emit_arguments("protocol", &format_args!("{}", protocol))?;
slog::Result::Ok(()) slog::Result::Ok(())
} }

View File

@@ -4,7 +4,6 @@ use crate::config::{gossipsub_config, GossipsubConfigParams, NetworkLoad};
use crate::discovery::{ use crate::discovery::{
subnet_predicate, DiscoveredPeers, Discovery, FIND_NODE_QUERY_CLOSEST_PEERS, subnet_predicate, DiscoveredPeers, Discovery, FIND_NODE_QUERY_CLOSEST_PEERS,
}; };
use crate::metrics::AggregatedBandwidthSinks;
use crate::peer_manager::{ use crate::peer_manager::{
config::Config as PeerManagerCfg, peerdb::score::PeerAction, peerdb::score::ReportSource, config::Config as PeerManagerCfg, peerdb::score::PeerAction, peerdb::score::ReportSource,
ConnectionDirection, PeerManager, PeerManagerEvent, ConnectionDirection, PeerManager, PeerManagerEvent,
@@ -127,8 +126,6 @@ pub struct Network<AppReqId: ReqId, TSpec: EthSpec> {
/// The interval for updating gossipsub scores /// The interval for updating gossipsub scores
update_gossipsub_scores: tokio::time::Interval, update_gossipsub_scores: tokio::time::Interval,
gossip_cache: GossipCache, gossip_cache: GossipCache,
/// The bandwidth logger for the underlying libp2p transport.
pub bandwidth: AggregatedBandwidthSinks,
/// This node's PeerId. /// This node's PeerId.
pub local_peer_id: PeerId, pub local_peer_id: PeerId,
/// Logger for behaviour actions. /// Logger for behaviour actions.
@@ -139,10 +136,11 @@ pub struct Network<AppReqId: ReqId, TSpec: EthSpec> {
impl<AppReqId: ReqId, TSpec: EthSpec> Network<AppReqId, TSpec> { impl<AppReqId: ReqId, TSpec: EthSpec> Network<AppReqId, TSpec> {
pub async fn new( pub async fn new(
executor: task_executor::TaskExecutor, executor: task_executor::TaskExecutor,
ctx: ServiceContext<'_>, mut ctx: ServiceContext<'_>,
log: &slog::Logger, log: &slog::Logger,
) -> error::Result<(Self, Arc<NetworkGlobals<TSpec>>)> { ) -> error::Result<(Self, Arc<NetworkGlobals<TSpec>>)> {
let log = log.new(o!("service"=> "libp2p")); let log = log.new(o!("service"=> "libp2p"));
let mut config = ctx.config.clone(); let mut config = ctx.config.clone();
trace!(log, "Libp2p Service starting"); trace!(log, "Libp2p Service starting");
// initialise the node's ID // initialise the node's ID
@@ -257,10 +255,13 @@ impl<AppReqId: ReqId, TSpec: EthSpec> Network<AppReqId, TSpec> {
gossipsub_config_params, gossipsub_config_params,
); );
// If metrics are enabled for gossipsub build the configuration // If metrics are enabled for libp2p build the configuration
let gossipsub_metrics = ctx let gossipsub_metrics = ctx.libp2p_registry.as_mut().map(|registry| {
.gossipsub_registry (
.map(|registry| (registry, Default::default())); registry.sub_registry_with_prefix("gossipsub"),
Default::default(),
)
});
let snappy_transform = SnappyTransform::new(config.gs_config.max_transmit_size()); let snappy_transform = SnappyTransform::new(config.gs_config.max_transmit_size());
let mut gossipsub = Gossipsub::new_with_subscription_filter_and_transform( let mut gossipsub = Gossipsub::new_with_subscription_filter_and_transform(
@@ -366,9 +367,8 @@ impl<AppReqId: ReqId, TSpec: EthSpec> Network<AppReqId, TSpec> {
}; };
// Set up the transport - tcp/quic with noise and mplex // Set up the transport - tcp/quic with noise and mplex
let (transport, bandwidth) = let transport = build_transport(local_keypair.clone(), !config.disable_quic_support)
build_transport(local_keypair.clone(), !config.disable_quic_support) .map_err(|e| format!("Failed to build transport: {:?}", e))?;
.map_err(|e| format!("Failed to build transport: {:?}", e))?;
// use the executor for libp2p // use the executor for libp2p
struct Executor(task_executor::TaskExecutor); struct Executor(task_executor::TaskExecutor);
@@ -379,20 +379,41 @@ impl<AppReqId: ReqId, TSpec: EthSpec> Network<AppReqId, TSpec> {
} }
// sets up the libp2p swarm. // sets up the libp2p swarm.
let swarm = SwarmBuilder::with_existing_identity(local_keypair)
.with_tokio() let swarm = {
.with_other_transport(|_key| transport) let builder = SwarmBuilder::with_existing_identity(local_keypair)
.expect("infalible") .with_tokio()
.with_behaviour(|_| behaviour) .with_other_transport(|_key| transport)
.expect("infalible") .expect("infalible");
.with_swarm_config(|_| {
libp2p::swarm::Config::with_executor(Executor(executor)) // NOTE: adding bandwidth metrics changes the generics of the swarm, so types diverge
.with_notify_handler_buffer_size( if let Some(libp2p_registry) = ctx.libp2p_registry {
std::num::NonZeroUsize::new(7).expect("Not zero"), builder
) .with_bandwidth_metrics(libp2p_registry)
.with_per_connection_event_buffer_size(4) .with_behaviour(|_| behaviour)
}) .expect("infalible")
.build(); .with_swarm_config(|_| {
libp2p::swarm::Config::with_executor(Executor(executor))
.with_notify_handler_buffer_size(
std::num::NonZeroUsize::new(7).expect("Not zero"),
)
.with_per_connection_event_buffer_size(4)
})
.build()
} else {
builder
.with_behaviour(|_| behaviour)
.expect("infalible")
.with_swarm_config(|_| {
libp2p::swarm::Config::with_executor(Executor(executor))
.with_notify_handler_buffer_size(
std::num::NonZeroUsize::new(7).expect("Not zero"),
)
.with_per_connection_event_buffer_size(4)
})
.build()
}
};
let mut network = Network { let mut network = Network {
swarm, swarm,
@@ -403,7 +424,6 @@ impl<AppReqId: ReqId, TSpec: EthSpec> Network<AppReqId, TSpec> {
score_settings, score_settings,
update_gossipsub_scores, update_gossipsub_scores,
gossip_cache, gossip_cache,
bandwidth,
local_peer_id, local_peer_id,
log, log,
}; };
@@ -1251,7 +1271,7 @@ impl<AppReqId: ReqId, TSpec: EthSpec> Network<AppReqId, TSpec> {
let handler_id = event.conn_id; let handler_id = event.conn_id;
// The METADATA and PING RPC responses are handled within the behaviour and not propagated // The METADATA and PING RPC responses are handled within the behaviour and not propagated
match event.event { match event.event {
Err(handler_err) => { HandlerEvent::Err(handler_err) => {
match handler_err { match handler_err {
HandlerErr::Inbound { HandlerErr::Inbound {
id: _, id: _,
@@ -1286,7 +1306,7 @@ impl<AppReqId: ReqId, TSpec: EthSpec> Network<AppReqId, TSpec> {
} }
} }
} }
Ok(RPCReceived::Request(id, request)) => { HandlerEvent::Ok(RPCReceived::Request(id, request)) => {
let peer_request_id = (handler_id, id); let peer_request_id = (handler_id, id);
match request { match request {
/* Behaviour managed protocols: Ping and Metadata */ /* Behaviour managed protocols: Ping and Metadata */
@@ -1385,7 +1405,7 @@ impl<AppReqId: ReqId, TSpec: EthSpec> Network<AppReqId, TSpec> {
} }
} }
} }
Ok(RPCReceived::Response(id, resp)) => { HandlerEvent::Ok(RPCReceived::Response(id, resp)) => {
match resp { match resp {
/* Behaviour managed protocols */ /* Behaviour managed protocols */
RPCResponse::Pong(ping) => { RPCResponse::Pong(ping) => {
@@ -1422,7 +1442,7 @@ impl<AppReqId: ReqId, TSpec: EthSpec> Network<AppReqId, TSpec> {
} }
} }
} }
Ok(RPCReceived::EndOfStream(id, termination)) => { HandlerEvent::Ok(RPCReceived::EndOfStream(id, termination)) => {
let response = match termination { let response = match termination {
ResponseTermination::BlocksByRange => Response::BlocksByRange(None), ResponseTermination::BlocksByRange => Response::BlocksByRange(None),
ResponseTermination::BlocksByRoot => Response::BlocksByRoot(None), ResponseTermination::BlocksByRoot => Response::BlocksByRoot(None),
@@ -1431,6 +1451,11 @@ impl<AppReqId: ReqId, TSpec: EthSpec> Network<AppReqId, TSpec> {
}; };
self.build_response(id, peer_id, response) self.build_response(id, peer_id, response)
} }
HandlerEvent::Close(_) => {
let _ = self.swarm.disconnect_peer_id(peer_id);
// NOTE: we wait for the swarm to report the connection as actually closed
None
}
} }
} }
@@ -1624,7 +1649,11 @@ impl<AppReqId: ReqId, TSpec: EthSpec> Network<AppReqId, TSpec> {
None None
} }
} }
SwarmEvent::Dialing { .. } => None, _ => {
// NOTE: SwarmEvent is a non exhaustive enum so updates should be based on
// release notes more than compiler feedback
None
}
}; };
if let Some(ev) = maybe_event { if let Some(ev) = maybe_event {

View File

@@ -1,4 +1,3 @@
use crate::metrics::AggregatedBandwidthSinks;
use crate::multiaddr::Protocol; use crate::multiaddr::Protocol;
use crate::rpc::{MetaData, MetaDataV1, MetaDataV2}; use crate::rpc::{MetaData, MetaDataV1, MetaDataV2};
use crate::types::{ use crate::types::{
@@ -9,8 +8,8 @@ use futures::future::Either;
use libp2p::core::{multiaddr::Multiaddr, muxing::StreamMuxerBox, transport::Boxed}; use libp2p::core::{multiaddr::Multiaddr, muxing::StreamMuxerBox, transport::Boxed};
use libp2p::gossipsub; use libp2p::gossipsub;
use libp2p::identity::{secp256k1, Keypair}; use libp2p::identity::{secp256k1, Keypair};
use libp2p::{core, noise, yamux, PeerId, Transport, TransportExt}; use libp2p::quic;
use libp2p_quic; use libp2p::{core, noise, yamux, PeerId, Transport};
use prometheus_client::registry::Registry; use prometheus_client::registry::Registry;
use slog::{debug, warn}; use slog::{debug, warn};
use ssz::Decode; use ssz::Decode;
@@ -34,7 +33,7 @@ pub struct Context<'a> {
pub enr_fork_id: EnrForkId, pub enr_fork_id: EnrForkId,
pub fork_context: Arc<ForkContext>, pub fork_context: Arc<ForkContext>,
pub chain_spec: &'a ChainSpec, pub chain_spec: &'a ChainSpec,
pub gossipsub_registry: Option<&'a mut Registry>, pub libp2p_registry: Option<&'a mut Registry>,
} }
type BoxedTransport = Boxed<(PeerId, StreamMuxerBox)>; type BoxedTransport = Boxed<(PeerId, StreamMuxerBox)>;
@@ -44,7 +43,7 @@ type BoxedTransport = Boxed<(PeerId, StreamMuxerBox)>;
pub fn build_transport( pub fn build_transport(
local_private_key: Keypair, local_private_key: Keypair,
quic_support: bool, quic_support: bool,
) -> std::io::Result<(BoxedTransport, AggregatedBandwidthSinks)> { ) -> std::io::Result<BoxedTransport> {
// mplex config // mplex config
let mut mplex_config = libp2p_mplex::MplexConfig::new(); let mut mplex_config = libp2p_mplex::MplexConfig::new();
mplex_config.set_max_buffer_size(256); mplex_config.set_max_buffer_size(256);
@@ -53,44 +52,35 @@ pub fn build_transport(
// yamux config // yamux config
let mut yamux_config = yamux::Config::default(); let mut yamux_config = yamux::Config::default();
yamux_config.set_window_update_mode(yamux::WindowUpdateMode::on_read()); yamux_config.set_window_update_mode(yamux::WindowUpdateMode::on_read());
// Creates the TCP transport layer // Creates the TCP transport layer
let (tcp, tcp_bandwidth) = let tcp = libp2p::tcp::tokio::Transport::new(libp2p::tcp::Config::default().nodelay(true))
libp2p::tcp::tokio::Transport::new(libp2p::tcp::Config::default().nodelay(true)) .upgrade(core::upgrade::Version::V1)
.upgrade(core::upgrade::Version::V1) .authenticate(generate_noise_config(&local_private_key))
.authenticate(generate_noise_config(&local_private_key)) .multiplex(core::upgrade::SelectUpgrade::new(
.multiplex(core::upgrade::SelectUpgrade::new( yamux_config,
yamux_config, mplex_config,
mplex_config, ))
)) .timeout(Duration::from_secs(10));
.timeout(Duration::from_secs(10)) let transport = if quic_support {
.with_bandwidth_logging();
let (transport, bandwidth) = if quic_support {
// Enables Quic // Enables Quic
// The default quic configuration suits us for now. // The default quic configuration suits us for now.
let quic_config = libp2p_quic::Config::new(&local_private_key); let quic_config = quic::Config::new(&local_private_key);
let (quic, quic_bandwidth) = let quic = quic::tokio::Transport::new(quic_config);
libp2p_quic::tokio::Transport::new(quic_config).with_bandwidth_logging();
let transport = tcp let transport = tcp
.or_transport(quic) .or_transport(quic)
.map(|either_output, _| match either_output { .map(|either_output, _| match either_output {
Either::Left((peer_id, muxer)) => (peer_id, StreamMuxerBox::new(muxer)), Either::Left((peer_id, muxer)) => (peer_id, StreamMuxerBox::new(muxer)),
Either::Right((peer_id, muxer)) => (peer_id, StreamMuxerBox::new(muxer)), Either::Right((peer_id, muxer)) => (peer_id, StreamMuxerBox::new(muxer)),
}) });
.boxed(); transport.boxed()
(
transport,
AggregatedBandwidthSinks::new(tcp_bandwidth, Some(quic_bandwidth)),
)
} else { } else {
(tcp, AggregatedBandwidthSinks::new(tcp_bandwidth, None)) tcp.boxed()
}; };
// Enables DNS over the transport. // Enables DNS over the transport.
let transport = libp2p::dns::tokio::Transport::system(transport)?.boxed(); let transport = libp2p::dns::tokio::Transport::system(transport)?.boxed();
Ok((transport, bandwidth)) Ok(transport)
} }
// Useful helper functions for debugging. Currently not used in the client. // Useful helper functions for debugging. Currently not used in the client.

View File

@@ -113,7 +113,7 @@ pub async fn build_libp2p_instance(
enr_fork_id: EnrForkId::default(), enr_fork_id: EnrForkId::default(),
fork_context: Arc::new(fork_context(fork_name)), fork_context: Arc::new(fork_context(fork_name)),
chain_spec: spec, chain_spec: spec,
gossipsub_registry: None, libp2p_registry: None,
}; };
Libp2pInstance( Libp2pInstance(
LibP2PService::new(executor, libp2p_context, &log) LibP2PService::new(executor, libp2p_context, &log)

View File

@@ -7,8 +7,8 @@ use beacon_chain::{
use fnv::FnvHashMap; use fnv::FnvHashMap;
pub use lighthouse_metrics::*; pub use lighthouse_metrics::*;
use lighthouse_network::{ use lighthouse_network::{
metrics::AggregatedBandwidthSinks, peer_manager::peerdb::client::ClientKind, types::GossipKind, peer_manager::peerdb::client::ClientKind, types::GossipKind, GossipTopic, Gossipsub,
GossipTopic, Gossipsub, NetworkGlobals, NetworkGlobals,
}; };
use std::sync::Arc; use std::sync::Arc;
use strum::IntoEnumIterator; use strum::IntoEnumIterator;
@@ -223,12 +223,6 @@ lazy_static! {
lazy_static! { lazy_static! {
/*
* Bandwidth metrics
*/
pub static ref LIBP2P_BYTES: Result<IntCounterVec> =
try_create_int_counter_vec("libp2p_inbound_bytes", "The bandwidth over libp2p", &["direction", "transport"]);
/* /*
* Sync related metrics * Sync related metrics
*/ */
@@ -327,25 +321,6 @@ lazy_static! {
); );
} }
pub fn update_bandwidth_metrics(bandwidth: &AggregatedBandwidthSinks) {
if let Some(tcp_in_bandwidth) = get_int_counter(&LIBP2P_BYTES, &["inbound", "tcp"]) {
tcp_in_bandwidth.reset();
tcp_in_bandwidth.inc_by(bandwidth.total_tcp_inbound());
}
if let Some(tcp_out_bandwidth) = get_int_counter(&LIBP2P_BYTES, &["outbound", "tcp"]) {
tcp_out_bandwidth.reset();
tcp_out_bandwidth.inc_by(bandwidth.total_tcp_outbound());
}
if let Some(quic_in_bandwidth) = get_int_counter(&LIBP2P_BYTES, &["inbound", "quic"]) {
quic_in_bandwidth.reset();
quic_in_bandwidth.inc_by(bandwidth.total_quic_inbound());
}
if let Some(quic_out_bandwidth) = get_int_counter(&LIBP2P_BYTES, &["outbound", "quic"]) {
quic_out_bandwidth.reset();
quic_out_bandwidth.inc_by(bandwidth.total_quic_outbound());
}
}
pub fn register_finality_update_error(error: &LightClientFinalityUpdateError) { pub fn register_finality_update_error(error: &LightClientFinalityUpdateError) {
inc_counter_vec(&GOSSIP_FINALITY_UPDATE_ERRORS_PER_TYPE, &[error.as_ref()]); inc_counter_vec(&GOSSIP_FINALITY_UPDATE_ERRORS_PER_TYPE, &[error.as_ref()]);
} }

View File

@@ -219,7 +219,7 @@ impl<T: BeaconChainTypes> NetworkService<T> {
beacon_chain: Arc<BeaconChain<T>>, beacon_chain: Arc<BeaconChain<T>>,
config: &NetworkConfig, config: &NetworkConfig,
executor: task_executor::TaskExecutor, executor: task_executor::TaskExecutor,
gossipsub_registry: Option<&'_ mut Registry>, libp2p_registry: Option<&'_ mut Registry>,
beacon_processor_send: BeaconProcessorSend<T::EthSpec>, beacon_processor_send: BeaconProcessorSend<T::EthSpec>,
beacon_processor_reprocess_tx: mpsc::Sender<ReprocessQueueMessage>, beacon_processor_reprocess_tx: mpsc::Sender<ReprocessQueueMessage>,
) -> error::Result<( ) -> error::Result<(
@@ -285,7 +285,7 @@ impl<T: BeaconChainTypes> NetworkService<T> {
enr_fork_id, enr_fork_id,
fork_context: fork_context.clone(), fork_context: fork_context.clone(),
chain_spec: &beacon_chain.spec, chain_spec: &beacon_chain.spec,
gossipsub_registry, libp2p_registry,
}; };
// launch libp2p service // launch libp2p service
@@ -380,7 +380,7 @@ impl<T: BeaconChainTypes> NetworkService<T> {
beacon_chain: Arc<BeaconChain<T>>, beacon_chain: Arc<BeaconChain<T>>,
config: &NetworkConfig, config: &NetworkConfig,
executor: task_executor::TaskExecutor, executor: task_executor::TaskExecutor,
gossipsub_registry: Option<&'_ mut Registry>, libp2p_registry: Option<&'_ mut Registry>,
beacon_processor_send: BeaconProcessorSend<T::EthSpec>, beacon_processor_send: BeaconProcessorSend<T::EthSpec>,
beacon_processor_reprocess_tx: mpsc::Sender<ReprocessQueueMessage>, beacon_processor_reprocess_tx: mpsc::Sender<ReprocessQueueMessage>,
) -> error::Result<(Arc<NetworkGlobals<T::EthSpec>>, NetworkSenders<T::EthSpec>)> { ) -> error::Result<(Arc<NetworkGlobals<T::EthSpec>>, NetworkSenders<T::EthSpec>)> {
@@ -388,7 +388,7 @@ impl<T: BeaconChainTypes> NetworkService<T> {
beacon_chain, beacon_chain,
config, config,
executor.clone(), executor.clone(),
gossipsub_registry, libp2p_registry,
beacon_processor_send, beacon_processor_send,
beacon_processor_reprocess_tx, beacon_processor_reprocess_tx,
) )
@@ -497,7 +497,6 @@ impl<T: BeaconChainTypes> NetworkService<T> {
} }
} }
} }
metrics::update_bandwidth_metrics(&self.libp2p.bandwidth);
} }
}; };
executor.spawn(service_fut, "network"); executor.spawn(service_fut, "network");

View File

@@ -4,7 +4,7 @@ version = "4.5.0"
authors = ["Sigma Prime <contact@sigmaprime.io>"] authors = ["Sigma Prime <contact@sigmaprime.io>"]
edition = { workspace = true } edition = { workspace = true }
autotests = false autotests = false
rust-version = "1.69.0" rust-version = "1.73.0"
[features] [features]
default = ["slasher-lmdb"] default = ["slasher-lmdb"]