diff --git a/beacon_node/eth2-libp2p/Cargo.toml b/beacon_node/eth2-libp2p/Cargo.toml index 089a89ccb7..c30ecbd8f1 100644 --- a/beacon_node/eth2-libp2p/Cargo.toml +++ b/beacon_node/eth2-libp2p/Cargo.toml @@ -36,6 +36,7 @@ tokio-util = { version = "0.3.1", features = ["codec", "compat"] } discv5 = { git = "https://github.com/sigp/discv5", rev = "7b3bd40591b62b8c002ffdb85de008aa9f82e2e5" } tiny-keccak = "2.0.2" libp2p-tcp = { version = "0.18.0", default-features = false, features = ["tokio"] } +environment = { path = "../../lighthouse/environment" } [dependencies.libp2p] version = "0.18.1" @@ -49,3 +50,4 @@ slog-stdlog = "4.0.0" slog-term = "2.5.0" slog-async = "2.5.0" tempdir = "0.3.7" +exit-future = "0.2.0" diff --git a/beacon_node/eth2-libp2p/tests/common/mod.rs b/beacon_node/eth2-libp2p/tests/common/mod.rs index 76c35d1675..3af5675fe4 100644 --- a/beacon_node/eth2-libp2p/tests/common/mod.rs +++ b/beacon_node/eth2-libp2p/tests/common/mod.rs @@ -12,6 +12,8 @@ use types::{EnrForkId, MinimalEthSpec}; type E = MinimalEthSpec; use tempdir::TempDir; +type Libp2pInstance = (LibP2PService, exit_future::Signal); + pub fn build_log(level: slog::Level, enabled: bool) -> slog::Logger { let decorator = slog_term::TermDecorator::new().build(); let drain = slog_term::FullFormat::new(decorator).build().fuse(); @@ -82,18 +84,19 @@ pub fn build_libp2p_instance( boot_nodes: Vec, secret_key: Option, log: slog::Logger, -) -> LibP2PService { +) -> Libp2pInstance { let port = unused_port("tcp").unwrap(); let config = build_config(port, boot_nodes, secret_key); + let (signal, exit) = exit_future::signal(); + let executor = + environment::TaskExecutor::new(tokio::runtime::Handle::current(), exit, log.clone()); // launch libp2p service - LibP2PService::new( - tokio::runtime::Handle::current(), - &config, - EnrForkId::default(), - &log, + ( + LibP2PService::new(executor, &config, EnrForkId::default(), &log) + .expect("should build libp2p instance") + .1, + signal, ) - .expect("should build libp2p instance") - .1 } #[allow(dead_code)] @@ -104,19 +107,19 @@ pub fn get_enr(node: &LibP2PService) -> Enr { // Returns `n` libp2p peers in fully connected topology. #[allow(dead_code)] -pub fn build_full_mesh(log: slog::Logger, n: usize) -> Vec> { - let mut nodes: Vec> = (0..n) +pub fn build_full_mesh(log: slog::Logger, n: usize) -> Vec { + let mut nodes: Vec<_> = (0..n) .map(|_| build_libp2p_instance(vec![], None, log.clone())) .collect(); let multiaddrs: Vec = nodes .iter() - .map(|x| get_enr(&x).multiaddr()[1].clone()) + .map(|x| get_enr(&x.0).multiaddr()[1].clone()) .collect(); for (i, node) in nodes.iter_mut().enumerate().take(n) { for (j, multiaddr) in multiaddrs.iter().enumerate().skip(i) { if i != j { - match libp2p::Swarm::dial_addr(&mut node.swarm, multiaddr.clone()) { + match libp2p::Swarm::dial_addr(&mut node.0.swarm, multiaddr.clone()) { Ok(()) => debug!(log, "Connected"), Err(_) => error!(log, "Failed to connect"), }; @@ -129,26 +132,27 @@ pub fn build_full_mesh(log: slog::Logger, n: usize) -> Vec> { // Constructs a pair of nodes with separate loggers. The sender dials the receiver. // This returns a (sender, receiver) pair. #[allow(dead_code)] -pub async fn build_node_pair(log: &slog::Logger) -> (LibP2PService, LibP2PService) { +pub async fn build_node_pair(log: &slog::Logger) -> (Libp2pInstance, Libp2pInstance) { let sender_log = log.new(o!("who" => "sender")); let receiver_log = log.new(o!("who" => "receiver")); let mut sender = build_libp2p_instance(vec![], None, sender_log); let mut receiver = build_libp2p_instance(vec![], None, receiver_log); - let receiver_multiaddr = receiver.swarm.discovery().local_enr().clone().multiaddr()[1].clone(); + let receiver_multiaddr = + receiver.0.swarm.discovery().local_enr().clone().multiaddr()[1].clone(); // let the two nodes set up listeners let sender_fut = async { loop { - if let Libp2pEvent::NewListenAddr(_) = sender.next_event().await { + if let Libp2pEvent::NewListenAddr(_) = sender.0.next_event().await { return; } } }; let receiver_fut = async { loop { - if let Libp2pEvent::NewListenAddr(_) = receiver.next_event().await { + if let Libp2pEvent::NewListenAddr(_) = receiver.0.next_event().await { return; } } @@ -162,7 +166,7 @@ pub async fn build_node_pair(log: &slog::Logger) -> (LibP2PService, LibP2PSer _ = joined => {} } - match libp2p::Swarm::dial_addr(&mut sender.swarm, receiver_multiaddr.clone()) { + match libp2p::Swarm::dial_addr(&mut sender.0.swarm, receiver_multiaddr.clone()) { Ok(()) => { debug!(log, "Sender dialed receiver"; "address" => format!("{:?}", receiver_multiaddr)) } @@ -173,16 +177,16 @@ pub async fn build_node_pair(log: &slog::Logger) -> (LibP2PService, LibP2PSer // Returns `n` peers in a linear topology #[allow(dead_code)] -pub fn build_linear(log: slog::Logger, n: usize) -> Vec> { - let mut nodes: Vec> = (0..n) +pub fn build_linear(log: slog::Logger, n: usize) -> Vec { + let mut nodes: Vec<_> = (0..n) .map(|_| build_libp2p_instance(vec![], None, log.clone())) .collect(); let multiaddrs: Vec = nodes .iter() - .map(|x| get_enr(&x).multiaddr()[1].clone()) + .map(|x| get_enr(&x.0).multiaddr()[1].clone()) .collect(); for i in 0..n - 1 { - match libp2p::Swarm::dial_addr(&mut nodes[i].swarm, multiaddrs[i + 1].clone()) { + match libp2p::Swarm::dial_addr(&mut nodes[i].0.swarm, multiaddrs[i + 1].clone()) { Ok(()) => debug!(log, "Connected"), Err(_) => error!(log, "Failed to connect"), }; diff --git a/beacon_node/eth2-libp2p/tests/gossipsub_tests.rs b/beacon_node/eth2-libp2p/tests/gossipsub_tests.rs index 3fcd9b7014..ec1d5e3516 100644 --- a/beacon_node/eth2-libp2p/tests/gossipsub_tests.rs +++ b/beacon_node/eth2-libp2p/tests/gossipsub_tests.rs @@ -43,7 +43,7 @@ async fn test_gossipsub_forward() { let fut = async move { for node in nodes.iter_mut() { loop { - match node.next_event().await { + match node.0.next_event().await { Libp2pEvent::Behaviour(b) => match b { BehaviourEvent::PubsubMessage { topics, @@ -61,7 +61,7 @@ async fn test_gossipsub_forward() { assert_eq!(message, pubsub_message.clone()); received_count += 1; // Since `propagate_message` is false, need to propagate manually - node.swarm.propagate_message(&source, id); + node.0.swarm.propagate_message(&source, id); // Test should succeed if all nodes except the publisher receive the message if received_count == num_nodes - 1 { debug!(log.clone(), "Received message at {} nodes", num_nodes - 1); @@ -74,7 +74,7 @@ async fn test_gossipsub_forward() { subscribed_count += 1; // Every node except the corner nodes are connected to 2 nodes. if subscribed_count == (num_nodes * 2) - 2 { - node.swarm.publish(vec![pubsub_message.clone()]); + node.0.swarm.publish(vec![pubsub_message.clone()]); } } } @@ -129,7 +129,7 @@ async fn test_gossipsub_full_mesh_publish() { topics, message, .. - }) = node.next_event().await + }) = node.0.next_event().await { assert_eq!(topics.len(), 1); // Assert topic is the published topic @@ -146,13 +146,16 @@ async fn test_gossipsub_full_mesh_publish() { } } while let Libp2pEvent::Behaviour(BehaviourEvent::PeerSubscribed(_, topic)) = - publishing_node.next_event().await + publishing_node.0.next_event().await { // Publish on beacon block topic if topic == TopicHash::from_raw(publishing_topic.clone()) { subscribed_count += 1; if subscribed_count == num_nodes - 1 { - publishing_node.swarm.publish(vec![pubsub_message.clone()]); + publishing_node + .0 + .swarm + .publish(vec![pubsub_message.clone()]); } } } diff --git a/beacon_node/eth2-libp2p/tests/noise.rs b/beacon_node/eth2-libp2p/tests/noise.rs index 10fb61ef9d..973fb425c6 100644 --- a/beacon_node/eth2-libp2p/tests/noise.rs +++ b/beacon_node/eth2-libp2p/tests/noise.rs @@ -136,14 +136,12 @@ async fn test_secio_noise_fallback() { let port = common::unused_port("tcp").unwrap(); let noisy_config = common::build_config(port, vec![], None); - let mut noisy_node = Service::new( - tokio::runtime::Handle::current(), - &noisy_config, - EnrForkId::default(), - &log, - ) - .expect("should build a libp2p instance") - .1; + let (_signal, exit) = exit_future::signal(); + let executor = + environment::TaskExecutor::new(tokio::runtime::Handle::current(), exit, log.clone()); + let mut noisy_node = Service::new(executor, &noisy_config, EnrForkId::default(), &log) + .expect("should build a libp2p instance") + .1; let port = common::unused_port("tcp").unwrap(); let secio_config = common::build_config(port, vec![common::get_enr(&noisy_node)], None); diff --git a/beacon_node/eth2-libp2p/tests/rpc_tests.rs b/beacon_node/eth2-libp2p/tests/rpc_tests.rs index 847be08842..222921ca02 100644 --- a/beacon_node/eth2-libp2p/tests/rpc_tests.rs +++ b/beacon_node/eth2-libp2p/tests/rpc_tests.rs @@ -23,7 +23,7 @@ async fn test_status_rpc() { let log = common::build_log(log_level, enable_logging); // get sender/receiver - let (mut sender, mut receiver) = common::build_node_pair(&log).await; + let ((mut sender, _exit_1), (mut receiver, _exit_2)) = common::build_node_pair(&log).await; // Dummy STATUS RPC message let rpc_request = RPCRequest::Status(StatusMessage { @@ -126,7 +126,7 @@ async fn test_blocks_by_range_chunked_rpc() { let log = common::build_log(log_level, enable_logging); // get sender/receiver - let (mut sender, mut receiver) = common::build_node_pair(&log).await; + let ((mut sender, _exit_1), (mut receiver, _exit_2)) = common::build_node_pair(&log).await; // BlocksByRange Request let rpc_request = RPCRequest::BlocksByRange(BlocksByRangeRequest { @@ -248,7 +248,7 @@ async fn test_blocks_by_range_chunked_rpc_terminates_correctly() { let log = common::build_log(log_level, enable_logging); // get sender/receiver - let (mut sender, mut receiver) = common::build_node_pair(&log).await; + let ((mut sender, _exit_1), (mut receiver, _exit_2)) = common::build_node_pair(&log).await; // BlocksByRange Request let rpc_request = RPCRequest::BlocksByRange(BlocksByRangeRequest { @@ -379,7 +379,7 @@ async fn test_blocks_by_range_single_empty_rpc() { let log = common::build_log(log_level, enable_logging); // get sender/receiver - let (mut sender, mut receiver) = common::build_node_pair(&log).await; + let ((mut sender, _exit_1), (mut receiver, _exit_2)) = common::build_node_pair(&log).await; // BlocksByRange Request let rpc_request = RPCRequest::BlocksByRange(BlocksByRangeRequest { @@ -505,7 +505,7 @@ async fn test_blocks_by_root_chunked_rpc() { let spec = E::default_spec(); // get sender/receiver - let (mut sender, mut receiver) = common::build_node_pair(&log).await; + let ((mut sender, _exit_1), (mut receiver, _exit_2)) = common::build_node_pair(&log).await; // BlocksByRoot Request let rpc_request = RPCRequest::BlocksByRoot(BlocksByRootRequest { @@ -630,7 +630,7 @@ async fn test_blocks_by_root_chunked_rpc_terminates_correctly() { let spec = E::default_spec(); // get sender/receiver - let (mut sender, mut receiver) = common::build_node_pair(&log).await; + let ((mut sender, _exit_1), (mut receiver, _exit_2)) = common::build_node_pair(&log).await; // BlocksByRoot Request let rpc_request = RPCRequest::BlocksByRoot(BlocksByRootRequest { @@ -772,7 +772,7 @@ async fn test_goodbye_rpc() { let log = common::build_log(log_level, enable_logging); // get sender/receiver - let (mut sender, mut receiver) = common::build_node_pair(&log).await; + let ((mut sender, _exit_1), (mut receiver, _exit_2)) = common::build_node_pair(&log).await; // Goodbye Request let rpc_request = RPCRequest::Goodbye(GoodbyeReason::ClientShutdown);