Fix the fallback transport construction (#1102)

This commit is contained in:
Pawan Dhananjay
2020-05-05 15:05:37 +05:30
committed by GitHub
parent 1ccf83b574
commit c444a47f3c

View File

@@ -7,13 +7,11 @@ use crate::{NetworkConfig, NetworkGlobals};
use futures::prelude::*;
use futures::Stream;
use libp2p::core::{
connection::Substream,
identity::Keypair,
multiaddr::Multiaddr,
muxing::StreamMuxerBox,
transport::boxed::Boxed,
upgrade::{InboundUpgradeExt, OutboundUpgradeExt},
ConnectedPoint,
};
use libp2p::{core, noise, secio, swarm::NetworkBehaviour, PeerId, Swarm, Transport};
use slog::{crit, debug, error, info, trace, warn};
@@ -84,7 +82,8 @@ impl<TSpec: EthSpec> Service<TSpec> {
let mut swarm = {
// Set up the transport - tcp/ws with noise/secio and mplex/yamux
let transport = build_transport(local_keypair.clone());
let transport = build_transport(local_keypair.clone())
.map_err(|e| format!("Failed to build transport: {:?}", e))?;
// Lighthouse network behaviour
let behaviour = Behaviour::new(&local_keypair, config, network_globals.clone(), &log)?;
Swarm::new(transport, behaviour, local_peer_id.clone())
@@ -238,7 +237,7 @@ impl<TSpec: EthSpec> Stream for Service<TSpec> {
/// mplex or yamux as the multiplexing layer.
fn build_transport(
local_private_key: Keypair,
) -> Result<Boxed<(PeerId, StreamMuxerBox), Error>, io::Error> {
) -> Result<Boxed<(PeerId, StreamMuxerBox), Error>, Error> {
// TODO: The Wire protocol currently doesn't specify encryption and this will need to be customised
// in the future.
let transport = libp2p::tcp::TcpConfig::new().nodelay(true);
@@ -247,21 +246,49 @@ fn build_transport(
let transport = {
let trans_clone = transport.clone();
transport.or_transport(websocket::WsConfig::new(trans_clone))
}?;
transport
.upgrade(core::upgrade::Version::V1)
.authenticate(core::upgrade::SelectUpgrade::new(
generate_noise_config(&local_private_key),
secio::SecioConfig::new(local_private_key),
))
.multiplex(core::upgrade::SelectUpgrade::new(
yamux::Config::default(),
mplex::MplexConfig::new(),
))
.map(|(peer, muxer), _| (peer, core::muxing::StreamMuxerBox::new(muxer)))
};
// Authentication
let transport = transport
.and_then(move |stream, endpoint| {
let upgrade = core::upgrade::SelectUpgrade::new(
generate_noise_config(&local_private_key),
secio::SecioConfig::new(local_private_key),
);
core::upgrade::apply(stream, upgrade, endpoint, core::upgrade::Version::V1).and_then(
|out| async move {
match out {
// Noise was negotiated
core::either::EitherOutput::First((remote_id, out)) => {
Ok((core::either::EitherOutput::First(out), remote_id))
}
// Secio was negotiated
core::either::EitherOutput::Second((remote_id, out)) => {
Ok((core::either::EitherOutput::Second(out), remote_id))
}
}
},
)
})
.timeout(Duration::from_secs(20));
// Multiplexing
let transport = transport
.and_then(move |(stream, peer_id), endpoint| {
let peer_id2 = peer_id.clone();
let upgrade = core::upgrade::SelectUpgrade::new(
libp2p::yamux::Config::default(),
libp2p::mplex::MplexConfig::new(),
)
.map_inbound(move |muxer| (peer_id, muxer))
.map_outbound(move |muxer| (peer_id2, muxer));
core::upgrade::apply(stream, upgrade, endpoint, core::upgrade::Version::V1)
.map_ok(|(id, muxer)| (id, core::muxing::StreamMuxerBox::new(muxer)))
})
.timeout(Duration::from_secs(20))
.map_err(|err| Error::new(ErrorKind::Other, err))
.boxed()
.boxed();
Ok(transport)
}
fn keypair_from_hex(hex_bytes: &str) -> error::Result<Keypair> {