diff --git a/Cargo.lock b/Cargo.lock index 860352139c..ab91881955 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -47,40 +47,6 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5d2e7343e7fc9de883d1b0341e0b13970f764c14101234857d2ddafa1cb1cac2" -[[package]] -name = "aes-ctr" -version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d2e5b0458ea3beae0d1d8c0f3946564f8e10f90646cf78c06b4351052058d1ee" -dependencies = [ - "aes-soft", - "aesni", - "ctr", - "stream-cipher", -] - -[[package]] -name = "aes-soft" -version = "0.3.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cfd7e7ae3f9a1fb5c03b389fc6bb9a51400d0c13053f0dca698c832bfd893a0d" -dependencies = [ - "block-cipher-trait", - "byteorder", - "opaque-debug", -] - -[[package]] -name = "aesni" -version = "0.6.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2f70a6b5f971e473091ab7cfb5ffac6cde81666c4556751d8d5620ead8abf100" -dependencies = [ - "block-cipher-trait", - "opaque-debug", - "stream-cipher", -] - [[package]] name = "ahash" version = "0.2.18" @@ -425,15 +391,6 @@ dependencies = [ "generic-array", ] -[[package]] -name = "block-cipher-trait" -version = "0.6.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1c924d49bd09e7c06003acda26cd9742e796e34282ec6c1189404dee0c1f4774" -dependencies = [ - "generic-array", -] - [[package]] name = "block-padding" version = "0.1.5" @@ -919,16 +876,6 @@ dependencies = [ "memchr", ] -[[package]] -name = "ctr" -version = "0.3.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "022cd691704491df67d25d006fe8eca083098253c4d43516c2206479c58c6736" -dependencies = [ - "block-cipher-trait", - "stream-cipher", -] - [[package]] name = "ctrlc" version = "3.1.4" @@ -2365,7 +2312,6 @@ dependencies = [ "libp2p-identify", "libp2p-mplex", "libp2p-noise", - "libp2p-secio", "libp2p-swarm", "libp2p-websocket", "libp2p-yamux", @@ -2510,36 +2456,6 @@ dependencies = [ "zeroize", ] -[[package]] -name = "libp2p-secio" -version = "0.19.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7b73f0cc119c83a5b619d6d11074a319fdb4aa4daf8088ade00d511418566e28" -dependencies = [ - "aes-ctr", - "ctr", - "futures 0.3.5", - "hmac", - "js-sys", - "lazy_static", - "libp2p-core", - "log 0.4.8", - "parity-send-wrapper", - "pin-project", - "prost", - "prost-build", - "quicksink", - "rand 0.7.3", - "ring", - "rw-stream-sink", - "sha2", - "static_assertions", - "twofish", - "wasm-bindgen", - "wasm-bindgen-futures", - "web-sys", -] - [[package]] name = "libp2p-swarm" version = "0.19.0" @@ -3241,12 +3157,6 @@ dependencies = [ "serde", ] -[[package]] -name = "parity-send-wrapper" -version = "0.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "aa9777aa91b8ad9dd5aaa04a9b6bcb02c7f1deb952fca5a66034d5e63afc5c6f" - [[package]] name = "parking_lot" version = "0.9.0" @@ -4691,15 +4601,6 @@ dependencies = [ "types", ] -[[package]] -name = "stream-cipher" -version = "0.3.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8131256a5896cabcf5eb04f4d6dacbe1aefda854b0d9896e09cb58829ec5638c" -dependencies = [ - "generic-array", -] - [[package]] name = "string" version = "0.2.1" @@ -5356,17 +5257,6 @@ version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e604eb7b43c06650e854be16a2a03155743d3752dd1c943f6829e26b7a36e382" -[[package]] -name = "twofish" -version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "712d261e83e727c8e2dbb75dacac67c36e35db36a958ee504f2164fc052434e1" -dependencies = [ - "block-cipher-trait", - "byteorder", - "opaque-debug", -] - [[package]] name = "typeable" version = "0.1.2" diff --git a/beacon_node/eth2-libp2p/Cargo.toml b/beacon_node/eth2-libp2p/Cargo.toml index 8b64e6ad92..5df8216a5d 100644 --- a/beacon_node/eth2-libp2p/Cargo.toml +++ b/beacon_node/eth2-libp2p/Cargo.toml @@ -41,7 +41,7 @@ libp2p-tcp = { version = "0.19.1", default-features = false, features = ["tokio" [dependencies.libp2p] version = "0.19.1" default-features = false -features = ["websocket", "identify", "mplex", "yamux", "noise", "secio", "gossipsub", "dns"] +features = ["websocket", "identify", "mplex", "yamux", "noise", "gossipsub", "dns"] [dev-dependencies] diff --git a/beacon_node/eth2-libp2p/src/service.rs b/beacon_node/eth2-libp2p/src/service.rs index e527ef7b63..ac4c778a6c 100644 --- a/beacon_node/eth2-libp2p/src/service.rs +++ b/beacon_node/eth2-libp2p/src/service.rs @@ -7,15 +7,11 @@ use crate::EnrExt; use crate::{NetworkConfig, NetworkGlobals}; use futures::prelude::*; use libp2p::core::{ - identity::Keypair, - multiaddr::Multiaddr, - muxing::StreamMuxerBox, - transport::boxed::Boxed, - upgrade::{InboundUpgradeExt, OutboundUpgradeExt}, + identity::Keypair, multiaddr::Multiaddr, muxing::StreamMuxerBox, transport::boxed::Boxed, ConnectedPoint, }; use libp2p::{ - core, noise, secio, + core, noise, swarm::{NetworkBehaviour, SwarmBuilder, SwarmEvent}, PeerId, Swarm, Transport, }; @@ -117,7 +113,7 @@ impl Service { debug!(log, "Attempting to open listening ports"; "address" => format!("{}", config.listen_address), "tcp_port" => config.libp2p_port, "udp_port" => config.discovery_port); let mut swarm = { - // Set up the transport - tcp/ws with noise/secio and mplex/yamux + // Set up the transport - tcp/ws with noise and yamux/mplex let transport = build_transport(local_keypair.clone()) .map_err(|e| format!("Failed to build transport: {:?}", e))?; // Lighthouse network behaviour @@ -376,8 +372,8 @@ impl Service { } } -/// The implementation supports TCP/IP, WebSockets over TCP/IP, noise/secio as the encryption -/// layer, and mplex or yamux as the multiplexing layer. +/// The implementation supports TCP/IP, WebSockets over TCP/IP, noise as the encryption layer, and +/// yamux or mplex as the multiplexing layer. fn build_transport( local_private_key: Keypair, ) -> Result, Error> { @@ -389,47 +385,18 @@ fn build_transport( transport.or_transport(websocket::WsConfig::new(trans_clone)) }; // Authentication - let transport = transport - .and_then(move |stream, endpoint| { - let upgrade = core::upgrade::SelectUpgrade::new( - generate_noise_config(&local_private_key), - secio::SecioConfig::new(local_private_key), - ); - core::upgrade::apply(stream, upgrade, endpoint, core::upgrade::Version::V1).and_then( - |out| async move { - match out { - // Noise was negotiated - core::either::EitherOutput::First((remote_id, out)) => { - Ok((core::either::EitherOutput::First(out), remote_id)) - } - // Secio was negotiated - core::either::EitherOutput::Second((remote_id, out)) => { - Ok((core::either::EitherOutput::Second(out), remote_id)) - } - } - }, - ) - }) - .timeout(Duration::from_secs(20)); - - // Multiplexing - let transport = transport - .and_then(move |(stream, peer_id), endpoint| { - let peer_id2 = peer_id.clone(); - let upgrade = core::upgrade::SelectUpgrade::new( - libp2p::yamux::Config::default(), - libp2p::mplex::MplexConfig::new(), - ) - .map_inbound(move |muxer| (peer_id, muxer)) - .map_outbound(move |muxer| (peer_id2, muxer)); - - core::upgrade::apply(stream, upgrade, endpoint, core::upgrade::Version::V1) - .map_ok(|(id, muxer)| (id, core::muxing::StreamMuxerBox::new(muxer))) - }) + Ok(transport + .upgrade(core::upgrade::Version::V1) + .authenticate(generate_noise_config(&local_private_key)) + .multiplex(core::upgrade::SelectUpgrade::new( + libp2p::yamux::Config::default(), + libp2p::mplex::MplexConfig::new(), + )) + .map(|(peer, muxer), _| (peer, core::muxing::StreamMuxerBox::new(muxer))) + .timeout(Duration::from_secs(20)) .timeout(Duration::from_secs(20)) .map_err(|err| Error::new(ErrorKind::Other, err)) - .boxed(); - Ok(transport) + .boxed()) } fn keypair_from_hex(hex_bytes: &str) -> error::Result { diff --git a/beacon_node/eth2-libp2p/src/types/pubsub.rs b/beacon_node/eth2-libp2p/src/types/pubsub.rs index 279dde28b5..e9cef46c67 100644 --- a/beacon_node/eth2-libp2p/src/types/pubsub.rs +++ b/beacon_node/eth2-libp2p/src/types/pubsub.rs @@ -68,41 +68,37 @@ impl PubsubMessage { continue; } Ok(gossip_topic) => { - let mut decompressed_data: Vec = Vec::new(); - let data = match gossip_topic.encoding() { - // group each part by encoding type + let ref decompressed_data = match gossip_topic.encoding() { GossipEncoding::SSZSnappy => { + // Exit early if uncompressed data is > GOSSIP_MAX_SIZE match decompress_len(data) { Ok(n) if n > GOSSIP_MAX_SIZE => { return Err("ssz_snappy decoded data > GOSSIP_MAX_SIZE".into()); } - Ok(n) => decompressed_data.resize(n, 0), + Ok(_) => {} Err(e) => { return Err(format!("{}", e)); } }; let mut decoder = Decoder::new(); - match decoder.decompress(data, &mut decompressed_data) { - Ok(n) => { - decompressed_data.truncate(n); - &decompressed_data - } + match decoder.decompress_vec(data) { + Ok(decompressed_data) => decompressed_data, Err(e) => return Err(format!("{}", e)), } } - GossipEncoding::SSZ => data, }; // the ssz decoders match gossip_topic.kind() { GossipKind::BeaconAggregateAndProof => { - let agg_and_proof = SignedAggregateAndProof::from_ssz_bytes(data) - .map_err(|e| format!("{:?}", e))?; + let agg_and_proof = + SignedAggregateAndProof::from_ssz_bytes(decompressed_data) + .map_err(|e| format!("{:?}", e))?; return Ok(PubsubMessage::AggregateAndProofAttestation(Box::new( agg_and_proof, ))); } GossipKind::CommitteeIndex(subnet_id) => { - let attestation = Attestation::from_ssz_bytes(data) + let attestation = Attestation::from_ssz_bytes(decompressed_data) .map_err(|e| format!("{:?}", e))?; return Ok(PubsubMessage::Attestation(Box::new(( *subnet_id, @@ -110,25 +106,27 @@ impl PubsubMessage { )))); } GossipKind::BeaconBlock => { - let beacon_block = SignedBeaconBlock::from_ssz_bytes(data) + let beacon_block = SignedBeaconBlock::from_ssz_bytes(decompressed_data) .map_err(|e| format!("{:?}", e))?; return Ok(PubsubMessage::BeaconBlock(Box::new(beacon_block))); } GossipKind::VoluntaryExit => { - let voluntary_exit = VoluntaryExit::from_ssz_bytes(data) + let voluntary_exit = VoluntaryExit::from_ssz_bytes(decompressed_data) .map_err(|e| format!("{:?}", e))?; return Ok(PubsubMessage::VoluntaryExit(Box::new(voluntary_exit))); } GossipKind::ProposerSlashing => { - let proposer_slashing = ProposerSlashing::from_ssz_bytes(data) - .map_err(|e| format!("{:?}", e))?; + let proposer_slashing = + ProposerSlashing::from_ssz_bytes(decompressed_data) + .map_err(|e| format!("{:?}", e))?; return Ok(PubsubMessage::ProposerSlashing(Box::new( proposer_slashing, ))); } GossipKind::AttesterSlashing => { - let attester_slashing = AttesterSlashing::from_ssz_bytes(data) - .map_err(|e| format!("{:?}", e))?; + let attester_slashing = + AttesterSlashing::from_ssz_bytes(decompressed_data) + .map_err(|e| format!("{:?}", e))?; return Ok(PubsubMessage::AttesterSlashing(Box::new( attester_slashing, ))); @@ -152,13 +150,6 @@ impl PubsubMessage { PubsubMessage::Attestation(data) => data.1.as_ssz_bytes(), }; match encoding { - GossipEncoding::SSZ => { - if data.len() > GOSSIP_MAX_SIZE { - return Err("ssz encoded data > GOSSIP_MAX_SIZE".into()); - } else { - Ok(data) - } - } GossipEncoding::SSZSnappy => { let mut encoder = Encoder::new(); match encoder.compress_vec(&data) { diff --git a/beacon_node/eth2-libp2p/src/types/topics.rs b/beacon_node/eth2-libp2p/src/types/topics.rs index ebb067e6b9..ce536e2e15 100644 --- a/beacon_node/eth2-libp2p/src/types/topics.rs +++ b/beacon_node/eth2-libp2p/src/types/topics.rs @@ -6,7 +6,6 @@ use types::SubnetId; // These constants form a topic name of the form /TOPIC_PREFIX/TOPIC/ENCODING_POSTFIX // For example /eth2/beacon_block/ssz pub const TOPIC_PREFIX: &str = "eth2"; -pub const SSZ_ENCODING_POSTFIX: &str = "ssz"; pub const SSZ_SNAPPY_ENCODING_POSTFIX: &str = "ssz_snappy"; pub const BEACON_BLOCK_TOPIC: &str = "beacon_block"; pub const BEACON_AGGREGATE_AND_PROOF_TOPIC: &str = "beacon_aggregate_and_proof"; @@ -64,8 +63,6 @@ impl std::fmt::Display for GossipKind { /// The known encoding types for gossipsub messages. #[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Hash)] pub enum GossipEncoding { - /// Messages are encoded with SSZ. - SSZ, /// Messages are encoded with SSZSnappy. SSZSnappy, } @@ -117,7 +114,6 @@ impl GossipTopic { fork_digest.copy_from_slice(&digest_bytes); let encoding = match topic_parts[4] { - SSZ_ENCODING_POSTFIX => GossipEncoding::SSZ, SSZ_SNAPPY_ENCODING_POSTFIX => GossipEncoding::SSZSnappy, _ => return Err(format!("Unknown encoding: {}", topic)), }; @@ -153,7 +149,6 @@ impl Into for GossipTopic { impl Into for GossipTopic { fn into(self) -> String { let encoding = match self.encoding { - GossipEncoding::SSZ => SSZ_ENCODING_POSTFIX, GossipEncoding::SSZSnappy => SSZ_SNAPPY_ENCODING_POSTFIX, }; diff --git a/beacon_node/eth2-libp2p/tests/noise.rs b/beacon_node/eth2-libp2p/tests/noise.rs deleted file mode 100644 index 973fb425c6..0000000000 --- a/beacon_node/eth2-libp2p/tests/noise.rs +++ /dev/null @@ -1,184 +0,0 @@ -#![cfg(test)] -use crate::behaviour::Behaviour; -use crate::multiaddr::Protocol; -use ::types::{EnrForkId, MinimalEthSpec}; -use eth2_libp2p::discovery::{build_enr, CombinedKey, CombinedKeyExt}; -use eth2_libp2p::*; -use futures::prelude::*; -use libp2p::core::identity::Keypair; -use libp2p::{ - core, - core::{muxing::StreamMuxerBox, transport::boxed::Boxed}, - secio, - swarm::{SwarmBuilder, SwarmEvent}, - PeerId, Swarm, Transport, -}; -use slog::{crit, debug, info, Level}; -use std::io::{Error, ErrorKind}; -use std::pin::Pin; -use std::sync::Arc; -use std::time::Duration; - -type TSpec = MinimalEthSpec; - -mod common; - -type Libp2pBehaviour = Behaviour; - -/// Build and return a eth2_libp2p Swarm with only secio support. -fn build_secio_swarm( - config: &NetworkConfig, - log: slog::Logger, -) -> error::Result> { - let local_keypair = Keypair::generate_secp256k1(); - let local_peer_id = PeerId::from(local_keypair.public()); - let enr_key = CombinedKey::from_libp2p(&local_keypair).unwrap(); - - let enr = build_enr::(&enr_key, config, EnrForkId::default()).unwrap(); - let network_globals = Arc::new(NetworkGlobals::new( - enr, - config.libp2p_port, - config.discovery_port, - &log, - )); - - let mut swarm = { - // Set up the transport - tcp/ws with secio and mplex/yamux - let transport = build_secio_transport(local_keypair.clone()); - // Lighthouse network behaviour - let behaviour = Behaviour::new(&local_keypair, config, network_globals.clone(), &log)?; - // requires a tokio runtime - struct Executor(tokio::runtime::Handle); - impl libp2p::core::Executor for Executor { - fn exec(&self, f: Pin + Send>>) { - self.0.spawn(f); - } - } - SwarmBuilder::new(transport, behaviour, local_peer_id.clone()) - .executor(Box::new(Executor(tokio::runtime::Handle::current()))) - .build() - }; - - // listen on the specified address - let listen_multiaddr = { - let mut m = Multiaddr::from(config.listen_address); - m.push(Protocol::Tcp(config.libp2p_port)); - m - }; - - match Swarm::listen_on(&mut swarm, listen_multiaddr.clone()) { - Ok(_) => { - let mut log_address = listen_multiaddr; - log_address.push(Protocol::P2p(local_peer_id.clone().into())); - info!(log, "Listening established"; "address" => format!("{}", log_address)); - } - Err(err) => { - crit!( - log, - "Unable to listen on libp2p address"; - "error" => format!("{:?}", err), - "listen_multiaddr" => format!("{}", listen_multiaddr), - ); - return Err("Libp2p was unable to listen on the given listen address.".into()); - } - }; - - // helper closure for dialing peers - let mut dial_addr = |multiaddr: &Multiaddr| { - match Swarm::dial_addr(&mut swarm, multiaddr.clone()) { - Ok(()) => debug!(log, "Dialing libp2p peer"; "address" => format!("{}", multiaddr)), - Err(err) => debug!( - log, - "Could not connect to peer"; "address" => format!("{}", multiaddr), "error" => format!("{:?}", err) - ), - }; - }; - - // attempt to connect to any specified boot-nodes - for bootnode_enr in &config.boot_nodes { - for multiaddr in &bootnode_enr.multiaddr() { - // ignore udp multiaddr if it exists - let components = multiaddr.iter().collect::>(); - if let Protocol::Udp(_) = components[1] { - continue; - } - dial_addr(multiaddr); - } - } - Ok(swarm) -} - -/// Build a simple TCP transport with secio, mplex/yamux. -fn build_secio_transport(local_private_key: Keypair) -> Boxed<(PeerId, StreamMuxerBox), Error> { - let transport = libp2p_tcp::TokioTcpConfig::new().nodelay(true); - transport - .upgrade(core::upgrade::Version::V1) - .authenticate(secio::SecioConfig::new(local_private_key)) - .multiplex(core::upgrade::SelectUpgrade::new( - libp2p::yamux::Config::default(), - libp2p::mplex::MplexConfig::new(), - )) - .map(|(peer, muxer), _| (peer, core::muxing::StreamMuxerBox::new(muxer))) - .timeout(Duration::from_secs(20)) - .timeout(Duration::from_secs(20)) - .map_err(|err| Error::new(ErrorKind::Other, err)) - .boxed() -} - -/// Test if the encryption falls back to secio if noise isn't available -#[tokio::test] -async fn test_secio_noise_fallback() { - // set up the logging. The level and enabled logging or not - let log_level = Level::Trace; - let enable_logging = false; - - let log = common::build_log(log_level, enable_logging); - - let port = common::unused_port("tcp").unwrap(); - let noisy_config = common::build_config(port, vec![], None); - let (_signal, exit) = exit_future::signal(); - let executor = - environment::TaskExecutor::new(tokio::runtime::Handle::current(), exit, log.clone()); - let mut noisy_node = Service::new(executor, &noisy_config, EnrForkId::default(), &log) - .expect("should build a libp2p instance") - .1; - - let port = common::unused_port("tcp").unwrap(); - let secio_config = common::build_config(port, vec![common::get_enr(&noisy_node)], None); - - // Building a custom Libp2pService from outside the crate isn't possible because of - // private fields in the Libp2pService struct. A swarm is good enough for testing - // compatibility with secio. - let mut secio_swarm = - build_secio_swarm(&secio_config, log.clone()).expect("should build a secio swarm"); - - let secio_log = log.clone(); - - let noisy_future = async { - loop { - noisy_node.next_event().await; - } - }; - - let secio_future = async { - loop { - match secio_swarm.next_event().await { - SwarmEvent::ConnectionEstablished { peer_id, .. } => { - // secio node negotiated a secio transport with - // the noise compatible node - info!(secio_log, "Connected to peer {}", peer_id); - return; - } - _ => {} // Ignore all other events - } - } - }; - - tokio::select! { - _ = noisy_future => {} - _ = secio_future => {} - _ = tokio::time::delay_for(Duration::from_millis(800)) => { - panic!("Future timed out"); - } - } -}