From 4d60694443ce3fd3259989817b6d9542e7bf481c Mon Sep 17 00:00:00 2001 From: Pawan Dhananjay Date: Mon, 2 Mar 2020 08:05:20 +0530 Subject: [PATCH] Add support for noise protocol (#873) * Add noise support with fallback to secio * Add config parameter for noise support * Add secio/noise compatibility test * Cleanup * Remove config parameter for noise support * Modify test to work between a secio swarm and a noise libp2p service * Minor fixes --- Cargo.lock | 1 + beacon_node/eth2-libp2p/Cargo.toml | 1 + beacon_node/eth2-libp2p/src/service.rs | 76 +++++++-- beacon_node/eth2-libp2p/tests/common/mod.rs | 9 +- beacon_node/eth2-libp2p/tests/noise.rs | 168 ++++++++++++++++++++ 5 files changed, 238 insertions(+), 17 deletions(-) create mode 100644 beacon_node/eth2-libp2p/tests/noise.rs diff --git a/Cargo.lock b/Cargo.lock index 551fae373b..2fbd271879 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1132,6 +1132,7 @@ dependencies = [ "slog-stdlog 4.0.0 (registry+https://github.com/rust-lang/crates.io-index)", "slog-term 2.4.2 (registry+https://github.com/rust-lang/crates.io-index)", "smallvec 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)", + "tempdir 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)", "tokio 0.1.22 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-io-timeout 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", "types 0.1.0", diff --git a/beacon_node/eth2-libp2p/Cargo.toml b/beacon_node/eth2-libp2p/Cargo.toml index 73cfaa022d..edee553079 100644 --- a/beacon_node/eth2-libp2p/Cargo.toml +++ b/beacon_node/eth2-libp2p/Cargo.toml @@ -36,3 +36,4 @@ base64 = "0.11.0" slog-stdlog = "4.0.0" slog-term = "2.4.2" slog-async = "2.3.0" +tempdir = "0.3" diff --git a/beacon_node/eth2-libp2p/src/service.rs b/beacon_node/eth2-libp2p/src/service.rs index 8e8a59719f..f9d5f72108 100644 --- a/beacon_node/eth2-libp2p/src/service.rs +++ b/beacon_node/eth2-libp2p/src/service.rs @@ -7,11 +7,16 @@ use crate::{NetworkGlobals, Topic, TopicHash}; use futures::prelude::*; use futures::Stream; use libp2p::core::{ - identity::Keypair, multiaddr::Multiaddr, muxing::StreamMuxerBox, nodes::Substream, - transport::boxed::Boxed, ConnectedPoint, + identity::Keypair, + multiaddr::Multiaddr, + muxing::StreamMuxerBox, + nodes::Substream, + transport::boxed::Boxed, + upgrade::{InboundUpgradeExt, OutboundUpgradeExt}, + ConnectedPoint, }; use libp2p::gossipsub::MessageId; -use libp2p::{core, secio, swarm::NetworkBehaviour, PeerId, Swarm, Transport}; +use libp2p::{core, noise, secio, swarm::NetworkBehaviour, PeerId, Swarm, Transport}; use slog::{crit, debug, error, info, trace, warn}; use std::fs::File; use std::io::prelude::*; @@ -68,7 +73,7 @@ impl Service { let network_globals = Arc::new(NetworkGlobals::new(local_peer_id.clone())); let mut swarm = { - // Set up the transport - tcp/ws with secio and mplex/yamux + // Set up the transport - tcp/ws with noise/secio and mplex/yamux let transport = build_transport(local_keypair.clone()); // Lighthouse network behaviour let behaviour = Behaviour::new(&local_keypair, config, network_globals.clone(), &log)?; @@ -250,7 +255,7 @@ impl Stream for Service { } } -/// The implementation supports TCP/IP, WebSockets over TCP/IP, secio as the encryption layer, and +/// The implementation supports TCP/IP, WebSockets over TCP/IP, noise/secio as the encryption layer, and /// mplex or yamux as the multiplexing layer. fn build_transport(local_private_key: Keypair) -> Boxed<(PeerId, StreamMuxerBox), Error> { // TODO: The Wire protocol currently doesn't specify encryption and this will need to be customised @@ -262,20 +267,51 @@ fn build_transport(local_private_key: Keypair) -> Boxed<(PeerId, StreamMuxerBox) let trans_clone = transport.clone(); transport.or_transport(websocket::WsConfig::new(trans_clone)) }; - transport - .upgrade(core::upgrade::Version::V1) - .authenticate(secio::SecioConfig::new(local_private_key)) - .multiplex(core::upgrade::SelectUpgrade::new( - libp2p::yamux::Config::default(), - libp2p::mplex::MplexConfig::new(), - )) - .map(|(peer, muxer), _| (peer, core::muxing::StreamMuxerBox::new(muxer))) - .timeout(Duration::from_secs(20)) + // Authentication + let transport = transport + .and_then(move |stream, endpoint| { + let upgrade = core::upgrade::SelectUpgrade::new( + generate_noise_config(&local_private_key), + secio::SecioConfig::new(local_private_key), + ); + core::upgrade::apply(stream, upgrade, endpoint, core::upgrade::Version::V1).and_then( + move |out| { + match out { + // Noise was negotiated + core::either::EitherOutput::First((remote_id, out)) => { + Ok((core::either::EitherOutput::First(out), remote_id)) + } + // Secio was negotiated + core::either::EitherOutput::Second((remote_id, out)) => { + Ok((core::either::EitherOutput::Second(out), remote_id)) + } + } + }, + ) + }) + .timeout(Duration::from_secs(20)); + + // Multiplexing + let transport = transport + .and_then(move |(stream, peer_id), endpoint| { + let peer_id2 = peer_id.clone(); + let upgrade = core::upgrade::SelectUpgrade::new( + libp2p::yamux::Config::default(), + libp2p::mplex::MplexConfig::new(), + ) + .map_inbound(move |muxer| (peer_id, muxer)) + .map_outbound(move |muxer| (peer_id2, muxer)); + + core::upgrade::apply(stream, upgrade, endpoint, core::upgrade::Version::V1) + .map(|(id, muxer)| (id, core::muxing::StreamMuxerBox::new(muxer))) + }) .timeout(Duration::from_secs(20)) .map_err(|err| Error::new(ErrorKind::Other, err)) - .boxed() + .boxed(); + transport } +#[derive(Debug)] /// Events that can be obtained from polling the Libp2p Service. pub enum Libp2pEvent { /// An RPC response request has been received on the swarm. @@ -363,3 +399,13 @@ fn load_private_key(config: &NetworkConfig, log: &slog::Logger) -> Keypair { } local_private_key } + +/// Generate authenticated XX Noise config from identity keys +fn generate_noise_config( + identity_keypair: &Keypair, +) -> noise::NoiseAuthenticated { + let static_dh_keys = noise::Keypair::::new() + .into_authentic(identity_keypair) + .expect("signing can fail only once during starting a node"); + noise::NoiseConfig::xx(static_dh_keys).into_authenticated() +} diff --git a/beacon_node/eth2-libp2p/tests/common/mod.rs b/beacon_node/eth2-libp2p/tests/common/mod.rs index 18b9c42f9b..035b6caefb 100644 --- a/beacon_node/eth2-libp2p/tests/common/mod.rs +++ b/beacon_node/eth2-libp2p/tests/common/mod.rs @@ -5,6 +5,7 @@ use eth2_libp2p::NetworkConfig; use eth2_libp2p::Service as LibP2PService; use slog::{debug, error, o, Drain}; use std::time::Duration; +use tempdir::TempDir; pub fn build_log(level: slog::Level, enabled: bool) -> slog::Logger { let decorator = slog_term::TermDecorator::new().build(); @@ -24,11 +25,13 @@ pub fn build_config( secret_key: Option, ) -> NetworkConfig { let mut config = NetworkConfig::default(); + let path = TempDir::new(&format!("libp2p_test{}", port)).unwrap(); + config.libp2p_port = port; // tcp port config.discovery_port = port; // udp port config.boot_nodes.append(&mut boot_nodes); config.secret_key_hex = secret_key; - config.network_dir.push(port.to_string()); + config.network_dir = path.into_path(); // Reduce gossipsub heartbeat parameters config.gs_config.heartbeat_initial_delay = Duration::from_millis(500); config.gs_config.heartbeat_interval = Duration::from_millis(500); @@ -43,7 +46,9 @@ pub fn build_libp2p_instance( ) -> LibP2PService { let config = build_config(port, boot_nodes, secret_key); // launch libp2p service - LibP2PService::new(&config, log.clone()).unwrap().1 + LibP2PService::new(&config, log.clone()) + .expect("should build libp2p instance") + .1 } #[allow(dead_code)] diff --git a/beacon_node/eth2-libp2p/tests/noise.rs b/beacon_node/eth2-libp2p/tests/noise.rs new file mode 100644 index 0000000000..5b322d3664 --- /dev/null +++ b/beacon_node/eth2-libp2p/tests/noise.rs @@ -0,0 +1,168 @@ +#![cfg(test)] +use crate::behaviour::{Behaviour, BehaviourEvent}; +use crate::multiaddr::Protocol; +use eth2_libp2p::*; +use futures::prelude::*; +use libp2p::core::identity::Keypair; +use libp2p::{ + core, + core::{muxing::StreamMuxerBox, nodes::Substream, transport::boxed::Boxed}, + secio, PeerId, Swarm, Transport, +}; +use slog::{crit, debug, info, Level}; +use std::io::{Error, ErrorKind}; +use std::sync::atomic::{AtomicBool, Ordering::Relaxed}; +use std::sync::Arc; +use std::time::Duration; +use tokio::prelude::*; + +mod common; + +type Libp2pStream = Boxed<(PeerId, StreamMuxerBox), Error>; +type Libp2pBehaviour = Behaviour>; + +/// Build and return a eth2_libp2p Swarm with only secio support. +fn build_secio_swarm( + config: &NetworkConfig, + log: slog::Logger, +) -> error::Result> { + let local_keypair = Keypair::generate_secp256k1(); + let local_peer_id = PeerId::from(local_keypair.public()); + + let network_globals = Arc::new(NetworkGlobals::new(local_peer_id.clone())); + + let mut swarm = { + // Set up the transport - tcp/ws with secio and mplex/yamux + let transport = build_secio_transport(local_keypair.clone()); + // Lighthouse network behaviour + let behaviour = Behaviour::new(&local_keypair, config, network_globals.clone(), &log)?; + Swarm::new(transport, behaviour, local_peer_id.clone()) + }; + + // listen on the specified address + let listen_multiaddr = { + let mut m = Multiaddr::from(config.listen_address); + m.push(Protocol::Tcp(config.libp2p_port)); + m + }; + + match Swarm::listen_on(&mut swarm, listen_multiaddr.clone()) { + Ok(_) => { + let mut log_address = listen_multiaddr; + log_address.push(Protocol::P2p(local_peer_id.clone().into())); + info!(log, "Listening established"; "address" => format!("{}", log_address)); + } + Err(err) => { + crit!( + log, + "Unable to listen on libp2p address"; + "error" => format!("{:?}", err), + "listen_multiaddr" => format!("{}", listen_multiaddr), + ); + return Err("Libp2p was unable to listen on the given listen address.".into()); + } + }; + + // helper closure for dialing peers + let mut dial_addr = |multiaddr: &Multiaddr| { + match Swarm::dial_addr(&mut swarm, multiaddr.clone()) { + Ok(()) => debug!(log, "Dialing libp2p peer"; "address" => format!("{}", multiaddr)), + Err(err) => debug!( + log, + "Could not connect to peer"; "address" => format!("{}", multiaddr), "error" => format!("{:?}", err) + ), + }; + }; + + // attempt to connect to any specified boot-nodes + for bootnode_enr in &config.boot_nodes { + for multiaddr in &bootnode_enr.multiaddr() { + // ignore udp multiaddr if it exists + let components = multiaddr.iter().collect::>(); + if let Protocol::Udp(_) = components[1] { + continue; + } + dial_addr(multiaddr); + } + } + Ok(swarm) +} + +/// Build a simple TCP transport with secio, mplex/yamux. +fn build_secio_transport(local_private_key: Keypair) -> Boxed<(PeerId, StreamMuxerBox), Error> { + let transport = libp2p::tcp::TcpConfig::new().nodelay(true); + transport + .upgrade(core::upgrade::Version::V1) + .authenticate(secio::SecioConfig::new(local_private_key)) + .multiplex(core::upgrade::SelectUpgrade::new( + libp2p::yamux::Config::default(), + libp2p::mplex::MplexConfig::new(), + )) + .map(|(peer, muxer), _| (peer, core::muxing::StreamMuxerBox::new(muxer))) + .timeout(Duration::from_secs(20)) + .timeout(Duration::from_secs(20)) + .map_err(|err| Error::new(ErrorKind::Other, err)) + .boxed() +} + +/// Test if the encryption falls back to secio if noise isn't available +#[test] +fn test_secio_noise_fallback() { + // set up the logging. The level and enabled logging or not + let log_level = Level::Trace; + let enable_logging = true; + + let log = common::build_log(log_level, enable_logging); + + let noisy_config = common::build_config(56010, vec![], None); + let mut noisy_node = Service::new(&noisy_config, log.clone()) + .expect("should build a libp2p instance") + .1; + + let secio_config = common::build_config(56011, vec![common::get_enr(&noisy_node)], None); + + // Building a custom Libp2pService from outside the crate isn't possible because of + // private fields in the Libp2pService struct. A swarm is good enough for testing + // compatibility with secio. + let mut secio_swarm = + build_secio_swarm(&secio_config, log.clone()).expect("should build a secio swarm"); + + let secio_log = log.clone(); + + let noisy_future = future::poll_fn(move || -> Poll { + loop { + match noisy_node.poll().unwrap() { + _ => return Ok(Async::NotReady), + } + } + }); + + let secio_future = future::poll_fn(move || -> Poll { + loop { + match secio_swarm.poll().unwrap() { + Async::Ready(Some(BehaviourEvent::PeerDialed(peer_id))) => { + // secio node negotiated a secio transport with + // the noise compatible node + info!(secio_log, "Connected to peer {}", peer_id); + return Ok(Async::Ready(true)); + } + _ => return Ok(Async::NotReady), + } + } + }); + + // execute the futures and check the result + let test_result = Arc::new(AtomicBool::new(false)); + let error_result = test_result.clone(); + let thread_result = test_result.clone(); + tokio::run( + noisy_future + .select(secio_future) + .timeout(Duration::from_millis(1000)) + .map_err(move |_| error_result.store(false, Relaxed)) + .map(move |result| { + thread_result.store(result.0, Relaxed); + }), + ); + assert!(test_result.load(Relaxed)); +}