From c444a47f3c862cf76d8fb44736947ad2bd2937f5 Mon Sep 17 00:00:00 2001 From: Pawan Dhananjay Date: Tue, 5 May 2020 15:05:37 +0530 Subject: [PATCH] Fix the fallback transport construction (#1102) --- beacon_node/eth2-libp2p/src/service.rs | 61 +++++++++++++++++++------- 1 file changed, 44 insertions(+), 17 deletions(-) diff --git a/beacon_node/eth2-libp2p/src/service.rs b/beacon_node/eth2-libp2p/src/service.rs index cde39bf18b..b5252d7a1c 100644 --- a/beacon_node/eth2-libp2p/src/service.rs +++ b/beacon_node/eth2-libp2p/src/service.rs @@ -7,13 +7,11 @@ use crate::{NetworkConfig, NetworkGlobals}; use futures::prelude::*; use futures::Stream; use libp2p::core::{ - connection::Substream, identity::Keypair, multiaddr::Multiaddr, muxing::StreamMuxerBox, transport::boxed::Boxed, upgrade::{InboundUpgradeExt, OutboundUpgradeExt}, - ConnectedPoint, }; use libp2p::{core, noise, secio, swarm::NetworkBehaviour, PeerId, Swarm, Transport}; use slog::{crit, debug, error, info, trace, warn}; @@ -84,7 +82,8 @@ impl Service { let mut swarm = { // Set up the transport - tcp/ws with noise/secio and mplex/yamux - let transport = build_transport(local_keypair.clone()); + let transport = build_transport(local_keypair.clone()) + .map_err(|e| format!("Failed to build transport: {:?}", e))?; // Lighthouse network behaviour let behaviour = Behaviour::new(&local_keypair, config, network_globals.clone(), &log)?; Swarm::new(transport, behaviour, local_peer_id.clone()) @@ -238,7 +237,7 @@ impl Stream for Service { /// mplex or yamux as the multiplexing layer. fn build_transport( local_private_key: Keypair, -) -> Result, io::Error> { +) -> Result, Error> { // TODO: The Wire protocol currently doesn't specify encryption and this will need to be customised // in the future. let transport = libp2p::tcp::TcpConfig::new().nodelay(true); @@ -247,21 +246,49 @@ fn build_transport( let transport = { let trans_clone = transport.clone(); transport.or_transport(websocket::WsConfig::new(trans_clone)) - }?; - transport - .upgrade(core::upgrade::Version::V1) - .authenticate(core::upgrade::SelectUpgrade::new( - generate_noise_config(&local_private_key), - secio::SecioConfig::new(local_private_key), - )) - .multiplex(core::upgrade::SelectUpgrade::new( - yamux::Config::default(), - mplex::MplexConfig::new(), - )) - .map(|(peer, muxer), _| (peer, core::muxing::StreamMuxerBox::new(muxer))) + }; + // Authentication + let transport = transport + .and_then(move |stream, endpoint| { + let upgrade = core::upgrade::SelectUpgrade::new( + generate_noise_config(&local_private_key), + secio::SecioConfig::new(local_private_key), + ); + core::upgrade::apply(stream, upgrade, endpoint, core::upgrade::Version::V1).and_then( + |out| async move { + match out { + // Noise was negotiated + core::either::EitherOutput::First((remote_id, out)) => { + Ok((core::either::EitherOutput::First(out), remote_id)) + } + // Secio was negotiated + core::either::EitherOutput::Second((remote_id, out)) => { + Ok((core::either::EitherOutput::Second(out), remote_id)) + } + } + }, + ) + }) + .timeout(Duration::from_secs(20)); + + // Multiplexing + let transport = transport + .and_then(move |(stream, peer_id), endpoint| { + let peer_id2 = peer_id.clone(); + let upgrade = core::upgrade::SelectUpgrade::new( + libp2p::yamux::Config::default(), + libp2p::mplex::MplexConfig::new(), + ) + .map_inbound(move |muxer| (peer_id, muxer)) + .map_outbound(move |muxer| (peer_id2, muxer)); + + core::upgrade::apply(stream, upgrade, endpoint, core::upgrade::Version::V1) + .map_ok(|(id, muxer)| (id, core::muxing::StreamMuxerBox::new(muxer))) + }) .timeout(Duration::from_secs(20)) .map_err(|err| Error::new(ErrorKind::Other, err)) - .boxed() + .boxed(); + Ok(transport) } fn keypair_from_hex(hex_bytes: &str) -> error::Result {