mirror of
https://github.com/sigp/lighthouse.git
synced 2026-05-07 16:55:46 +00:00
Ugly fix libp2p tests
This commit is contained in:
@@ -36,6 +36,7 @@ tokio-util = { version = "0.3.1", features = ["codec", "compat"] }
|
|||||||
discv5 = { git = "https://github.com/sigp/discv5", rev = "7b3bd40591b62b8c002ffdb85de008aa9f82e2e5" }
|
discv5 = { git = "https://github.com/sigp/discv5", rev = "7b3bd40591b62b8c002ffdb85de008aa9f82e2e5" }
|
||||||
tiny-keccak = "2.0.2"
|
tiny-keccak = "2.0.2"
|
||||||
libp2p-tcp = { version = "0.18.0", default-features = false, features = ["tokio"] }
|
libp2p-tcp = { version = "0.18.0", default-features = false, features = ["tokio"] }
|
||||||
|
environment = { path = "../../lighthouse/environment" }
|
||||||
|
|
||||||
[dependencies.libp2p]
|
[dependencies.libp2p]
|
||||||
version = "0.18.1"
|
version = "0.18.1"
|
||||||
@@ -49,3 +50,4 @@ slog-stdlog = "4.0.0"
|
|||||||
slog-term = "2.5.0"
|
slog-term = "2.5.0"
|
||||||
slog-async = "2.5.0"
|
slog-async = "2.5.0"
|
||||||
tempdir = "0.3.7"
|
tempdir = "0.3.7"
|
||||||
|
exit-future = "0.2.0"
|
||||||
|
|||||||
@@ -12,6 +12,8 @@ use types::{EnrForkId, MinimalEthSpec};
|
|||||||
type E = MinimalEthSpec;
|
type E = MinimalEthSpec;
|
||||||
use tempdir::TempDir;
|
use tempdir::TempDir;
|
||||||
|
|
||||||
|
type Libp2pInstance = (LibP2PService<E>, exit_future::Signal);
|
||||||
|
|
||||||
pub fn build_log(level: slog::Level, enabled: bool) -> slog::Logger {
|
pub fn build_log(level: slog::Level, enabled: bool) -> slog::Logger {
|
||||||
let decorator = slog_term::TermDecorator::new().build();
|
let decorator = slog_term::TermDecorator::new().build();
|
||||||
let drain = slog_term::FullFormat::new(decorator).build().fuse();
|
let drain = slog_term::FullFormat::new(decorator).build().fuse();
|
||||||
@@ -82,18 +84,19 @@ pub fn build_libp2p_instance(
|
|||||||
boot_nodes: Vec<Enr>,
|
boot_nodes: Vec<Enr>,
|
||||||
secret_key: Option<String>,
|
secret_key: Option<String>,
|
||||||
log: slog::Logger,
|
log: slog::Logger,
|
||||||
) -> LibP2PService<E> {
|
) -> Libp2pInstance {
|
||||||
let port = unused_port("tcp").unwrap();
|
let port = unused_port("tcp").unwrap();
|
||||||
let config = build_config(port, boot_nodes, secret_key);
|
let config = build_config(port, boot_nodes, secret_key);
|
||||||
|
let (signal, exit) = exit_future::signal();
|
||||||
|
let executor =
|
||||||
|
environment::TaskExecutor::new(tokio::runtime::Handle::current(), exit, log.clone());
|
||||||
// launch libp2p service
|
// launch libp2p service
|
||||||
LibP2PService::new(
|
(
|
||||||
tokio::runtime::Handle::current(),
|
LibP2PService::new(executor, &config, EnrForkId::default(), &log)
|
||||||
&config,
|
.expect("should build libp2p instance")
|
||||||
EnrForkId::default(),
|
.1,
|
||||||
&log,
|
signal,
|
||||||
)
|
)
|
||||||
.expect("should build libp2p instance")
|
|
||||||
.1
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[allow(dead_code)]
|
#[allow(dead_code)]
|
||||||
@@ -104,19 +107,19 @@ pub fn get_enr(node: &LibP2PService<E>) -> Enr {
|
|||||||
|
|
||||||
// Returns `n` libp2p peers in fully connected topology.
|
// Returns `n` libp2p peers in fully connected topology.
|
||||||
#[allow(dead_code)]
|
#[allow(dead_code)]
|
||||||
pub fn build_full_mesh(log: slog::Logger, n: usize) -> Vec<LibP2PService<E>> {
|
pub fn build_full_mesh(log: slog::Logger, n: usize) -> Vec<Libp2pInstance> {
|
||||||
let mut nodes: Vec<LibP2PService<E>> = (0..n)
|
let mut nodes: Vec<_> = (0..n)
|
||||||
.map(|_| build_libp2p_instance(vec![], None, log.clone()))
|
.map(|_| build_libp2p_instance(vec![], None, log.clone()))
|
||||||
.collect();
|
.collect();
|
||||||
let multiaddrs: Vec<Multiaddr> = nodes
|
let multiaddrs: Vec<Multiaddr> = nodes
|
||||||
.iter()
|
.iter()
|
||||||
.map(|x| get_enr(&x).multiaddr()[1].clone())
|
.map(|x| get_enr(&x.0).multiaddr()[1].clone())
|
||||||
.collect();
|
.collect();
|
||||||
|
|
||||||
for (i, node) in nodes.iter_mut().enumerate().take(n) {
|
for (i, node) in nodes.iter_mut().enumerate().take(n) {
|
||||||
for (j, multiaddr) in multiaddrs.iter().enumerate().skip(i) {
|
for (j, multiaddr) in multiaddrs.iter().enumerate().skip(i) {
|
||||||
if i != j {
|
if i != j {
|
||||||
match libp2p::Swarm::dial_addr(&mut node.swarm, multiaddr.clone()) {
|
match libp2p::Swarm::dial_addr(&mut node.0.swarm, multiaddr.clone()) {
|
||||||
Ok(()) => debug!(log, "Connected"),
|
Ok(()) => debug!(log, "Connected"),
|
||||||
Err(_) => error!(log, "Failed to connect"),
|
Err(_) => error!(log, "Failed to connect"),
|
||||||
};
|
};
|
||||||
@@ -129,26 +132,27 @@ pub fn build_full_mesh(log: slog::Logger, n: usize) -> Vec<LibP2PService<E>> {
|
|||||||
// Constructs a pair of nodes with separate loggers. The sender dials the receiver.
|
// Constructs a pair of nodes with separate loggers. The sender dials the receiver.
|
||||||
// This returns a (sender, receiver) pair.
|
// This returns a (sender, receiver) pair.
|
||||||
#[allow(dead_code)]
|
#[allow(dead_code)]
|
||||||
pub async fn build_node_pair(log: &slog::Logger) -> (LibP2PService<E>, LibP2PService<E>) {
|
pub async fn build_node_pair(log: &slog::Logger) -> (Libp2pInstance, Libp2pInstance) {
|
||||||
let sender_log = log.new(o!("who" => "sender"));
|
let sender_log = log.new(o!("who" => "sender"));
|
||||||
let receiver_log = log.new(o!("who" => "receiver"));
|
let receiver_log = log.new(o!("who" => "receiver"));
|
||||||
|
|
||||||
let mut sender = build_libp2p_instance(vec![], None, sender_log);
|
let mut sender = build_libp2p_instance(vec![], None, sender_log);
|
||||||
let mut receiver = build_libp2p_instance(vec![], None, receiver_log);
|
let mut receiver = build_libp2p_instance(vec![], None, receiver_log);
|
||||||
|
|
||||||
let receiver_multiaddr = receiver.swarm.discovery().local_enr().clone().multiaddr()[1].clone();
|
let receiver_multiaddr =
|
||||||
|
receiver.0.swarm.discovery().local_enr().clone().multiaddr()[1].clone();
|
||||||
|
|
||||||
// let the two nodes set up listeners
|
// let the two nodes set up listeners
|
||||||
let sender_fut = async {
|
let sender_fut = async {
|
||||||
loop {
|
loop {
|
||||||
if let Libp2pEvent::NewListenAddr(_) = sender.next_event().await {
|
if let Libp2pEvent::NewListenAddr(_) = sender.0.next_event().await {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
let receiver_fut = async {
|
let receiver_fut = async {
|
||||||
loop {
|
loop {
|
||||||
if let Libp2pEvent::NewListenAddr(_) = receiver.next_event().await {
|
if let Libp2pEvent::NewListenAddr(_) = receiver.0.next_event().await {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -162,7 +166,7 @@ pub async fn build_node_pair(log: &slog::Logger) -> (LibP2PService<E>, LibP2PSer
|
|||||||
_ = joined => {}
|
_ = joined => {}
|
||||||
}
|
}
|
||||||
|
|
||||||
match libp2p::Swarm::dial_addr(&mut sender.swarm, receiver_multiaddr.clone()) {
|
match libp2p::Swarm::dial_addr(&mut sender.0.swarm, receiver_multiaddr.clone()) {
|
||||||
Ok(()) => {
|
Ok(()) => {
|
||||||
debug!(log, "Sender dialed receiver"; "address" => format!("{:?}", receiver_multiaddr))
|
debug!(log, "Sender dialed receiver"; "address" => format!("{:?}", receiver_multiaddr))
|
||||||
}
|
}
|
||||||
@@ -173,16 +177,16 @@ pub async fn build_node_pair(log: &slog::Logger) -> (LibP2PService<E>, LibP2PSer
|
|||||||
|
|
||||||
// Returns `n` peers in a linear topology
|
// Returns `n` peers in a linear topology
|
||||||
#[allow(dead_code)]
|
#[allow(dead_code)]
|
||||||
pub fn build_linear(log: slog::Logger, n: usize) -> Vec<LibP2PService<E>> {
|
pub fn build_linear(log: slog::Logger, n: usize) -> Vec<Libp2pInstance> {
|
||||||
let mut nodes: Vec<LibP2PService<E>> = (0..n)
|
let mut nodes: Vec<_> = (0..n)
|
||||||
.map(|_| build_libp2p_instance(vec![], None, log.clone()))
|
.map(|_| build_libp2p_instance(vec![], None, log.clone()))
|
||||||
.collect();
|
.collect();
|
||||||
let multiaddrs: Vec<Multiaddr> = nodes
|
let multiaddrs: Vec<Multiaddr> = nodes
|
||||||
.iter()
|
.iter()
|
||||||
.map(|x| get_enr(&x).multiaddr()[1].clone())
|
.map(|x| get_enr(&x.0).multiaddr()[1].clone())
|
||||||
.collect();
|
.collect();
|
||||||
for i in 0..n - 1 {
|
for i in 0..n - 1 {
|
||||||
match libp2p::Swarm::dial_addr(&mut nodes[i].swarm, multiaddrs[i + 1].clone()) {
|
match libp2p::Swarm::dial_addr(&mut nodes[i].0.swarm, multiaddrs[i + 1].clone()) {
|
||||||
Ok(()) => debug!(log, "Connected"),
|
Ok(()) => debug!(log, "Connected"),
|
||||||
Err(_) => error!(log, "Failed to connect"),
|
Err(_) => error!(log, "Failed to connect"),
|
||||||
};
|
};
|
||||||
|
|||||||
@@ -43,7 +43,7 @@ async fn test_gossipsub_forward() {
|
|||||||
let fut = async move {
|
let fut = async move {
|
||||||
for node in nodes.iter_mut() {
|
for node in nodes.iter_mut() {
|
||||||
loop {
|
loop {
|
||||||
match node.next_event().await {
|
match node.0.next_event().await {
|
||||||
Libp2pEvent::Behaviour(b) => match b {
|
Libp2pEvent::Behaviour(b) => match b {
|
||||||
BehaviourEvent::PubsubMessage {
|
BehaviourEvent::PubsubMessage {
|
||||||
topics,
|
topics,
|
||||||
@@ -61,7 +61,7 @@ async fn test_gossipsub_forward() {
|
|||||||
assert_eq!(message, pubsub_message.clone());
|
assert_eq!(message, pubsub_message.clone());
|
||||||
received_count += 1;
|
received_count += 1;
|
||||||
// Since `propagate_message` is false, need to propagate manually
|
// Since `propagate_message` is false, need to propagate manually
|
||||||
node.swarm.propagate_message(&source, id);
|
node.0.swarm.propagate_message(&source, id);
|
||||||
// Test should succeed if all nodes except the publisher receive the message
|
// Test should succeed if all nodes except the publisher receive the message
|
||||||
if received_count == num_nodes - 1 {
|
if received_count == num_nodes - 1 {
|
||||||
debug!(log.clone(), "Received message at {} nodes", num_nodes - 1);
|
debug!(log.clone(), "Received message at {} nodes", num_nodes - 1);
|
||||||
@@ -74,7 +74,7 @@ async fn test_gossipsub_forward() {
|
|||||||
subscribed_count += 1;
|
subscribed_count += 1;
|
||||||
// Every node except the corner nodes are connected to 2 nodes.
|
// Every node except the corner nodes are connected to 2 nodes.
|
||||||
if subscribed_count == (num_nodes * 2) - 2 {
|
if subscribed_count == (num_nodes * 2) - 2 {
|
||||||
node.swarm.publish(vec![pubsub_message.clone()]);
|
node.0.swarm.publish(vec![pubsub_message.clone()]);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -129,7 +129,7 @@ async fn test_gossipsub_full_mesh_publish() {
|
|||||||
topics,
|
topics,
|
||||||
message,
|
message,
|
||||||
..
|
..
|
||||||
}) = node.next_event().await
|
}) = node.0.next_event().await
|
||||||
{
|
{
|
||||||
assert_eq!(topics.len(), 1);
|
assert_eq!(topics.len(), 1);
|
||||||
// Assert topic is the published topic
|
// Assert topic is the published topic
|
||||||
@@ -146,13 +146,16 @@ async fn test_gossipsub_full_mesh_publish() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
while let Libp2pEvent::Behaviour(BehaviourEvent::PeerSubscribed(_, topic)) =
|
while let Libp2pEvent::Behaviour(BehaviourEvent::PeerSubscribed(_, topic)) =
|
||||||
publishing_node.next_event().await
|
publishing_node.0.next_event().await
|
||||||
{
|
{
|
||||||
// Publish on beacon block topic
|
// Publish on beacon block topic
|
||||||
if topic == TopicHash::from_raw(publishing_topic.clone()) {
|
if topic == TopicHash::from_raw(publishing_topic.clone()) {
|
||||||
subscribed_count += 1;
|
subscribed_count += 1;
|
||||||
if subscribed_count == num_nodes - 1 {
|
if subscribed_count == num_nodes - 1 {
|
||||||
publishing_node.swarm.publish(vec![pubsub_message.clone()]);
|
publishing_node
|
||||||
|
.0
|
||||||
|
.swarm
|
||||||
|
.publish(vec![pubsub_message.clone()]);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -136,14 +136,12 @@ async fn test_secio_noise_fallback() {
|
|||||||
|
|
||||||
let port = common::unused_port("tcp").unwrap();
|
let port = common::unused_port("tcp").unwrap();
|
||||||
let noisy_config = common::build_config(port, vec![], None);
|
let noisy_config = common::build_config(port, vec![], None);
|
||||||
let mut noisy_node = Service::new(
|
let (_signal, exit) = exit_future::signal();
|
||||||
tokio::runtime::Handle::current(),
|
let executor =
|
||||||
&noisy_config,
|
environment::TaskExecutor::new(tokio::runtime::Handle::current(), exit, log.clone());
|
||||||
EnrForkId::default(),
|
let mut noisy_node = Service::new(executor, &noisy_config, EnrForkId::default(), &log)
|
||||||
&log,
|
.expect("should build a libp2p instance")
|
||||||
)
|
.1;
|
||||||
.expect("should build a libp2p instance")
|
|
||||||
.1;
|
|
||||||
|
|
||||||
let port = common::unused_port("tcp").unwrap();
|
let port = common::unused_port("tcp").unwrap();
|
||||||
let secio_config = common::build_config(port, vec![common::get_enr(&noisy_node)], None);
|
let secio_config = common::build_config(port, vec![common::get_enr(&noisy_node)], None);
|
||||||
|
|||||||
@@ -23,7 +23,7 @@ async fn test_status_rpc() {
|
|||||||
let log = common::build_log(log_level, enable_logging);
|
let log = common::build_log(log_level, enable_logging);
|
||||||
|
|
||||||
// get sender/receiver
|
// get sender/receiver
|
||||||
let (mut sender, mut receiver) = common::build_node_pair(&log).await;
|
let ((mut sender, _exit_1), (mut receiver, _exit_2)) = common::build_node_pair(&log).await;
|
||||||
|
|
||||||
// Dummy STATUS RPC message
|
// Dummy STATUS RPC message
|
||||||
let rpc_request = RPCRequest::Status(StatusMessage {
|
let rpc_request = RPCRequest::Status(StatusMessage {
|
||||||
@@ -126,7 +126,7 @@ async fn test_blocks_by_range_chunked_rpc() {
|
|||||||
let log = common::build_log(log_level, enable_logging);
|
let log = common::build_log(log_level, enable_logging);
|
||||||
|
|
||||||
// get sender/receiver
|
// get sender/receiver
|
||||||
let (mut sender, mut receiver) = common::build_node_pair(&log).await;
|
let ((mut sender, _exit_1), (mut receiver, _exit_2)) = common::build_node_pair(&log).await;
|
||||||
|
|
||||||
// BlocksByRange Request
|
// BlocksByRange Request
|
||||||
let rpc_request = RPCRequest::BlocksByRange(BlocksByRangeRequest {
|
let rpc_request = RPCRequest::BlocksByRange(BlocksByRangeRequest {
|
||||||
@@ -248,7 +248,7 @@ async fn test_blocks_by_range_chunked_rpc_terminates_correctly() {
|
|||||||
let log = common::build_log(log_level, enable_logging);
|
let log = common::build_log(log_level, enable_logging);
|
||||||
|
|
||||||
// get sender/receiver
|
// get sender/receiver
|
||||||
let (mut sender, mut receiver) = common::build_node_pair(&log).await;
|
let ((mut sender, _exit_1), (mut receiver, _exit_2)) = common::build_node_pair(&log).await;
|
||||||
|
|
||||||
// BlocksByRange Request
|
// BlocksByRange Request
|
||||||
let rpc_request = RPCRequest::BlocksByRange(BlocksByRangeRequest {
|
let rpc_request = RPCRequest::BlocksByRange(BlocksByRangeRequest {
|
||||||
@@ -379,7 +379,7 @@ async fn test_blocks_by_range_single_empty_rpc() {
|
|||||||
let log = common::build_log(log_level, enable_logging);
|
let log = common::build_log(log_level, enable_logging);
|
||||||
|
|
||||||
// get sender/receiver
|
// get sender/receiver
|
||||||
let (mut sender, mut receiver) = common::build_node_pair(&log).await;
|
let ((mut sender, _exit_1), (mut receiver, _exit_2)) = common::build_node_pair(&log).await;
|
||||||
|
|
||||||
// BlocksByRange Request
|
// BlocksByRange Request
|
||||||
let rpc_request = RPCRequest::BlocksByRange(BlocksByRangeRequest {
|
let rpc_request = RPCRequest::BlocksByRange(BlocksByRangeRequest {
|
||||||
@@ -505,7 +505,7 @@ async fn test_blocks_by_root_chunked_rpc() {
|
|||||||
let spec = E::default_spec();
|
let spec = E::default_spec();
|
||||||
|
|
||||||
// get sender/receiver
|
// get sender/receiver
|
||||||
let (mut sender, mut receiver) = common::build_node_pair(&log).await;
|
let ((mut sender, _exit_1), (mut receiver, _exit_2)) = common::build_node_pair(&log).await;
|
||||||
|
|
||||||
// BlocksByRoot Request
|
// BlocksByRoot Request
|
||||||
let rpc_request = RPCRequest::BlocksByRoot(BlocksByRootRequest {
|
let rpc_request = RPCRequest::BlocksByRoot(BlocksByRootRequest {
|
||||||
@@ -630,7 +630,7 @@ async fn test_blocks_by_root_chunked_rpc_terminates_correctly() {
|
|||||||
let spec = E::default_spec();
|
let spec = E::default_spec();
|
||||||
|
|
||||||
// get sender/receiver
|
// get sender/receiver
|
||||||
let (mut sender, mut receiver) = common::build_node_pair(&log).await;
|
let ((mut sender, _exit_1), (mut receiver, _exit_2)) = common::build_node_pair(&log).await;
|
||||||
|
|
||||||
// BlocksByRoot Request
|
// BlocksByRoot Request
|
||||||
let rpc_request = RPCRequest::BlocksByRoot(BlocksByRootRequest {
|
let rpc_request = RPCRequest::BlocksByRoot(BlocksByRootRequest {
|
||||||
@@ -772,7 +772,7 @@ async fn test_goodbye_rpc() {
|
|||||||
let log = common::build_log(log_level, enable_logging);
|
let log = common::build_log(log_level, enable_logging);
|
||||||
|
|
||||||
// get sender/receiver
|
// get sender/receiver
|
||||||
let (mut sender, mut receiver) = common::build_node_pair(&log).await;
|
let ((mut sender, _exit_1), (mut receiver, _exit_2)) = common::build_node_pair(&log).await;
|
||||||
|
|
||||||
// Goodbye Request
|
// Goodbye Request
|
||||||
let rpc_request = RPCRequest::Goodbye(GoodbyeReason::ClientShutdown);
|
let rpc_request = RPCRequest::Goodbye(GoodbyeReason::ClientShutdown);
|
||||||
|
|||||||
Reference in New Issue
Block a user