Merge branch 'master' into spec-v0.12

This commit is contained in:
Paul Hauner
2020-06-21 10:33:02 +10:00
60 changed files with 1812 additions and 1107 deletions

View File

@@ -0,0 +1,199 @@
#![cfg(test)]
use eth2_libp2p::Enr;
use eth2_libp2p::EnrExt;
use eth2_libp2p::Multiaddr;
use eth2_libp2p::Service as LibP2PService;
use eth2_libp2p::{Libp2pEvent, NetworkConfig};
use slog::{debug, error, o, Drain};
use std::net::{TcpListener, UdpSocket};
use std::time::Duration;
use types::{EnrForkId, MinimalEthSpec};
type E = MinimalEthSpec;
use tempdir::TempDir;
pub struct Libp2pInstance(LibP2PService<E>, exit_future::Signal);
impl std::ops::Deref for Libp2pInstance {
type Target = LibP2PService<E>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl std::ops::DerefMut for Libp2pInstance {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
pub fn build_log(level: slog::Level, enabled: bool) -> slog::Logger {
let decorator = slog_term::TermDecorator::new().build();
let drain = slog_term::FullFormat::new(decorator).build().fuse();
let drain = slog_async::Async::new(drain).build().fuse();
if enabled {
slog::Logger::root(drain.filter_level(level).fuse(), o!())
} else {
slog::Logger::root(drain.filter(|_| false).fuse(), o!())
}
}
// A bit of hack to find an unused port.
///
/// Does not guarantee that the given port is unused after the function exists, just that it was
/// unused before the function started (i.e., it does not reserve a port).
pub fn unused_port(transport: &str) -> Result<u16, String> {
let local_addr = match transport {
"tcp" => {
let listener = TcpListener::bind("127.0.0.1:0").map_err(|e| {
format!("Failed to create TCP listener to find unused port: {:?}", e)
})?;
listener.local_addr().map_err(|e| {
format!(
"Failed to read TCP listener local_addr to find unused port: {:?}",
e
)
})?
}
"udp" => {
let socket = UdpSocket::bind("127.0.0.1:0")
.map_err(|e| format!("Failed to create UDP socket to find unused port: {:?}", e))?;
socket.local_addr().map_err(|e| {
format!(
"Failed to read UDP socket local_addr to find unused port: {:?}",
e
)
})?
}
_ => return Err("Invalid transport to find unused port".into()),
};
Ok(local_addr.port())
}
pub fn build_config(port: u16, mut boot_nodes: Vec<Enr>) -> NetworkConfig {
let mut config = NetworkConfig::default();
let path = TempDir::new(&format!("libp2p_test{}", port)).unwrap();
config.libp2p_port = port; // tcp port
config.discovery_port = port; // udp port
config.enr_tcp_port = Some(port);
config.enr_udp_port = Some(port);
config.enr_address = Some("127.0.0.1".parse().unwrap());
config.boot_nodes.append(&mut boot_nodes);
config.network_dir = path.into_path();
// Reduce gossipsub heartbeat parameters
config.gs_config.heartbeat_initial_delay = Duration::from_millis(500);
config.gs_config.heartbeat_interval = Duration::from_millis(500);
config
}
pub fn build_libp2p_instance(boot_nodes: Vec<Enr>, log: slog::Logger) -> Libp2pInstance {
let port = unused_port("tcp").unwrap();
let config = build_config(port, boot_nodes);
// launch libp2p service
let (signal, exit) = exit_future::signal();
let executor =
environment::TaskExecutor::new(tokio::runtime::Handle::current(), exit, log.clone());
Libp2pInstance(
LibP2PService::new(executor, &config, EnrForkId::default(), &log)
.expect("should build libp2p instance")
.1,
signal,
)
}
#[allow(dead_code)]
pub fn get_enr(node: &LibP2PService<E>) -> Enr {
let enr = node.swarm.local_enr().clone();
enr
}
// Returns `n` libp2p peers in fully connected topology.
#[allow(dead_code)]
pub fn build_full_mesh(log: slog::Logger, n: usize) -> Vec<Libp2pInstance> {
let mut nodes: Vec<_> = (0..n)
.map(|_| build_libp2p_instance(vec![], log.clone()))
.collect();
let multiaddrs: Vec<Multiaddr> = nodes
.iter()
.map(|x| get_enr(&x).multiaddr()[1].clone())
.collect();
for (i, node) in nodes.iter_mut().enumerate().take(n) {
for (j, multiaddr) in multiaddrs.iter().enumerate().skip(i) {
if i != j {
match libp2p::Swarm::dial_addr(&mut node.swarm, multiaddr.clone()) {
Ok(()) => debug!(log, "Connected"),
Err(_) => error!(log, "Failed to connect"),
};
}
}
}
nodes
}
// Constructs a pair of nodes with separate loggers. The sender dials the receiver.
// This returns a (sender, receiver) pair.
#[allow(dead_code)]
pub async fn build_node_pair(log: &slog::Logger) -> (Libp2pInstance, Libp2pInstance) {
let sender_log = log.new(o!("who" => "sender"));
let receiver_log = log.new(o!("who" => "receiver"));
let mut sender = build_libp2p_instance(vec![], sender_log);
let mut receiver = build_libp2p_instance(vec![], receiver_log);
let receiver_multiaddr = receiver.swarm.local_enr().multiaddr()[1].clone();
// let the two nodes set up listeners
let sender_fut = async {
loop {
if let Libp2pEvent::NewListenAddr(_) = sender.next_event().await {
return;
}
}
};
let receiver_fut = async {
loop {
if let Libp2pEvent::NewListenAddr(_) = receiver.next_event().await {
return;
}
}
};
let joined = futures::future::join(sender_fut, receiver_fut);
// wait for either both nodes to listen or a timeout
tokio::select! {
_ = tokio::time::delay_for(Duration::from_millis(500)) => {}
_ = joined => {}
}
match libp2p::Swarm::dial_addr(&mut sender.swarm, receiver_multiaddr.clone()) {
Ok(()) => {
debug!(log, "Sender dialed receiver"; "address" => format!("{:?}", receiver_multiaddr))
}
Err(_) => error!(log, "Dialing failed"),
};
(sender, receiver)
}
// Returns `n` peers in a linear topology
#[allow(dead_code)]
pub fn build_linear(log: slog::Logger, n: usize) -> Vec<Libp2pInstance> {
let mut nodes: Vec<_> = (0..n)
.map(|_| build_libp2p_instance(vec![], log.clone()))
.collect();
let multiaddrs: Vec<Multiaddr> = nodes
.iter()
.map(|x| get_enr(&x).multiaddr()[1].clone())
.collect();
for i in 0..n - 1 {
match libp2p::Swarm::dial_addr(&mut nodes[i].swarm, multiaddrs[i + 1].clone()) {
Ok(()) => debug!(log, "Connected"),
Err(_) => error!(log, "Failed to connect"),
};
}
nodes
}

View File

@@ -0,0 +1,171 @@
/* These are temporarily disabled due to their non-deterministic behaviour and impending update to
* gossipsub 1.1. We leave these here as a template for future test upgrades
#![cfg(test)]
use crate::types::GossipEncoding;
use ::types::{BeaconBlock, EthSpec, MinimalEthSpec, Signature, SignedBeaconBlock};
use eth2_libp2p::*;
use slog::{debug, Level};
type E = MinimalEthSpec;
mod common;
/* Gossipsub tests */
// Note: The aim of these tests is not to test the robustness of the gossip network
// but to check if the gossipsub implementation is behaving according to the specifications.
// Test if gossipsub message are forwarded by nodes with a simple linear topology.
//
// Topology used in test
//
// node1 <-> node2 <-> node3 ..... <-> node(n-1) <-> node(n)
#[tokio::test]
async fn test_gossipsub_forward() {
// set up the logging. The level and enabled or not
let log = common::build_log(Level::Info, false);
let num_nodes = 20;
let mut nodes = common::build_linear(log.clone(), num_nodes);
let mut received_count = 0;
let spec = E::default_spec();
let empty_block = BeaconBlock::empty(&spec);
let signed_block = SignedBeaconBlock {
message: empty_block,
signature: Signature::empty_signature(),
};
let pubsub_message = PubsubMessage::BeaconBlock(Box::new(signed_block));
let publishing_topic: String = pubsub_message
.topics(GossipEncoding::default(), [0, 0, 0, 0])
.first()
.unwrap()
.clone()
.into();
let mut subscribed_count = 0;
let fut = async move {
for node in nodes.iter_mut() {
loop {
match node.next_event().await {
Libp2pEvent::Behaviour(b) => match b {
BehaviourEvent::PubsubMessage {
topics,
message,
source,
id,
} => {
assert_eq!(topics.len(), 1);
// Assert topic is the published topic
assert_eq!(
topics.first().unwrap(),
&TopicHash::from_raw(publishing_topic.clone())
);
// Assert message received is the correct one
assert_eq!(message, pubsub_message.clone());
received_count += 1;
// Since `propagate_message` is false, need to propagate manually
node.swarm.propagate_message(&source, id);
// Test should succeed if all nodes except the publisher receive the message
if received_count == num_nodes - 1 {
debug!(log.clone(), "Received message at {} nodes", num_nodes - 1);
return;
}
}
BehaviourEvent::PeerSubscribed(_, topic) => {
// Publish on beacon block topic
if topic == TopicHash::from_raw(publishing_topic.clone()) {
subscribed_count += 1;
// Every node except the corner nodes are connected to 2 nodes.
if subscribed_count == (num_nodes * 2) - 2 {
node.swarm.publish(vec![pubsub_message.clone()]);
}
}
}
_ => break,
},
_ => break,
}
}
}
};
tokio::select! {
_ = fut => {}
_ = tokio::time::delay_for(tokio::time::Duration::from_millis(800)) => {
panic!("Future timed out");
}
}
}
// Test publishing of a message with a full mesh for the topic
// Not very useful but this is the bare minimum functionality.
#[tokio::test]
async fn test_gossipsub_full_mesh_publish() {
// set up the logging. The level and enabled or not
let log = common::build_log(Level::Debug, false);
// Note: This test does not propagate gossipsub messages.
// Having `num_nodes` > `mesh_n_high` may give inconsistent results
// as nodes may get pruned out of the mesh before the gossipsub message
// is published to them.
let num_nodes = 12;
let mut nodes = common::build_full_mesh(log, num_nodes);
let mut publishing_node = nodes.pop().unwrap();
let spec = E::default_spec();
let empty_block = BeaconBlock::empty(&spec);
let signed_block = SignedBeaconBlock {
message: empty_block,
signature: Signature::empty_signature(),
};
let pubsub_message = PubsubMessage::BeaconBlock(Box::new(signed_block));
let publishing_topic: String = pubsub_message
.topics(GossipEncoding::default(), [0, 0, 0, 0])
.first()
.unwrap()
.clone()
.into();
let mut subscribed_count = 0;
let mut received_count = 0;
let fut = async move {
for node in nodes.iter_mut() {
while let Libp2pEvent::Behaviour(BehaviourEvent::PubsubMessage {
topics,
message,
..
}) = node.next_event().await
{
assert_eq!(topics.len(), 1);
// Assert topic is the published topic
assert_eq!(
topics.first().unwrap(),
&TopicHash::from_raw(publishing_topic.clone())
);
// Assert message received is the correct one
assert_eq!(message, pubsub_message.clone());
received_count += 1;
if received_count == num_nodes - 1 {
return;
}
}
}
while let Libp2pEvent::Behaviour(BehaviourEvent::PeerSubscribed(_, topic)) =
publishing_node.next_event().await
{
// Publish on beacon block topic
if topic == TopicHash::from_raw(publishing_topic.clone()) {
subscribed_count += 1;
if subscribed_count == num_nodes - 1 {
publishing_node.swarm.publish(vec![pubsub_message.clone()]);
}
}
}
};
tokio::select! {
_ = fut => {}
_ = tokio::time::delay_for(tokio::time::Duration::from_millis(800)) => {
panic!("Future timed out");
}
}
}
*/

View File

@@ -0,0 +1,184 @@
#![cfg(test)]
use crate::behaviour::Behaviour;
use crate::multiaddr::Protocol;
use ::types::{EnrForkId, MinimalEthSpec};
use eth2_libp2p::discovery::{build_enr, CombinedKey, CombinedKeyExt};
use eth2_libp2p::*;
use futures::prelude::*;
use libp2p::core::identity::Keypair;
use libp2p::{
core,
core::{muxing::StreamMuxerBox, transport::boxed::Boxed},
secio,
swarm::{SwarmBuilder, SwarmEvent},
PeerId, Swarm, Transport,
};
use slog::{crit, debug, info, Level};
use std::io::{Error, ErrorKind};
use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;
type TSpec = MinimalEthSpec;
mod common;
type Libp2pBehaviour = Behaviour<TSpec>;
/// Build and return a eth2_libp2p Swarm with only secio support.
fn build_secio_swarm(
config: &NetworkConfig,
log: slog::Logger,
) -> error::Result<Swarm<Libp2pBehaviour>> {
let local_keypair = Keypair::generate_secp256k1();
let local_peer_id = PeerId::from(local_keypair.public());
let enr_key = CombinedKey::from_libp2p(&local_keypair).unwrap();
let enr = build_enr::<TSpec>(&enr_key, config, EnrForkId::default()).unwrap();
let network_globals = Arc::new(NetworkGlobals::new(
enr,
config.libp2p_port,
config.discovery_port,
&log,
));
let mut swarm = {
// Set up the transport - tcp/ws with secio and mplex/yamux
let transport = build_secio_transport(local_keypair.clone());
// Lighthouse network behaviour
let behaviour = Behaviour::new(&local_keypair, config, network_globals.clone(), &log)?;
// requires a tokio runtime
struct Executor(tokio::runtime::Handle);
impl libp2p::core::Executor for Executor {
fn exec(&self, f: Pin<Box<dyn Future<Output = ()> + Send>>) {
self.0.spawn(f);
}
}
SwarmBuilder::new(transport, behaviour, local_peer_id.clone())
.executor(Box::new(Executor(tokio::runtime::Handle::current())))
.build()
};
// listen on the specified address
let listen_multiaddr = {
let mut m = Multiaddr::from(config.listen_address);
m.push(Protocol::Tcp(config.libp2p_port));
m
};
match Swarm::listen_on(&mut swarm, listen_multiaddr.clone()) {
Ok(_) => {
let mut log_address = listen_multiaddr;
log_address.push(Protocol::P2p(local_peer_id.clone().into()));
info!(log, "Listening established"; "address" => format!("{}", log_address));
}
Err(err) => {
crit!(
log,
"Unable to listen on libp2p address";
"error" => format!("{:?}", err),
"listen_multiaddr" => format!("{}", listen_multiaddr),
);
return Err("Libp2p was unable to listen on the given listen address.".into());
}
};
// helper closure for dialing peers
let mut dial_addr = |multiaddr: &Multiaddr| {
match Swarm::dial_addr(&mut swarm, multiaddr.clone()) {
Ok(()) => debug!(log, "Dialing libp2p peer"; "address" => format!("{}", multiaddr)),
Err(err) => debug!(
log,
"Could not connect to peer"; "address" => format!("{}", multiaddr), "error" => format!("{:?}", err)
),
};
};
// attempt to connect to any specified boot-nodes
for bootnode_enr in &config.boot_nodes {
for multiaddr in &bootnode_enr.multiaddr() {
// ignore udp multiaddr if it exists
let components = multiaddr.iter().collect::<Vec<_>>();
if let Protocol::Udp(_) = components[1] {
continue;
}
dial_addr(multiaddr);
}
}
Ok(swarm)
}
/// Build a simple TCP transport with secio, mplex/yamux.
fn build_secio_transport(local_private_key: Keypair) -> Boxed<(PeerId, StreamMuxerBox), Error> {
let transport = libp2p_tcp::TokioTcpConfig::new().nodelay(true);
transport
.upgrade(core::upgrade::Version::V1)
.authenticate(secio::SecioConfig::new(local_private_key))
.multiplex(core::upgrade::SelectUpgrade::new(
libp2p::yamux::Config::default(),
libp2p::mplex::MplexConfig::new(),
))
.map(|(peer, muxer), _| (peer, core::muxing::StreamMuxerBox::new(muxer)))
.timeout(Duration::from_secs(20))
.timeout(Duration::from_secs(20))
.map_err(|err| Error::new(ErrorKind::Other, err))
.boxed()
}
/// Test if the encryption falls back to secio if noise isn't available
#[tokio::test]
async fn test_secio_noise_fallback() {
// set up the logging. The level and enabled logging or not
let log_level = Level::Trace;
let enable_logging = false;
let log = common::build_log(log_level, enable_logging);
let port = common::unused_port("tcp").unwrap();
let noisy_config = common::build_config(port, vec![]);
let (_signal, exit) = exit_future::signal();
let executor =
environment::TaskExecutor::new(tokio::runtime::Handle::current(), exit, log.clone());
let mut noisy_node = Service::new(executor, &noisy_config, EnrForkId::default(), &log)
.expect("should build a libp2p instance")
.1;
let port = common::unused_port("tcp").unwrap();
let secio_config = common::build_config(port, vec![common::get_enr(&noisy_node)]);
// Building a custom Libp2pService from outside the crate isn't possible because of
// private fields in the Libp2pService struct. A swarm is good enough for testing
// compatibility with secio.
let mut secio_swarm =
build_secio_swarm(&secio_config, log.clone()).expect("should build a secio swarm");
let secio_log = log.clone();
let noisy_future = async {
loop {
noisy_node.next_event().await;
}
};
let secio_future = async {
loop {
match secio_swarm.next_event().await {
SwarmEvent::ConnectionEstablished { peer_id, .. } => {
// secio node negotiated a secio transport with
// the noise compatible node
info!(secio_log, "Connected to peer {}", peer_id);
return;
}
_ => {} // Ignore all other events
}
}
};
tokio::select! {
_ = noisy_future => {}
_ = secio_future => {}
_ = tokio::time::delay_for(Duration::from_millis(800)) => {
panic!("Future timed out");
}
}
}

View File

@@ -0,0 +1,760 @@
#![cfg(test)]
use eth2_libp2p::rpc::methods::*;
use eth2_libp2p::{BehaviourEvent, Libp2pEvent, Request, Response};
use slog::{debug, warn, Level};
use ssz_types::VariableList;
use std::time::Duration;
use tokio::time::delay_for;
use types::{
BeaconBlock, Epoch, EthSpec, Hash256, MinimalEthSpec, Signature, SignedBeaconBlock, Slot,
};
mod common;
type E = MinimalEthSpec;
#[tokio::test]
// Tests the STATUS RPC message
async fn test_status_rpc() {
// set up the logging. The level and enabled logging or not
let log_level = Level::Debug;
let enable_logging = false;
let log = common::build_log(log_level, enable_logging);
// get sender/receiver
let (mut sender, mut receiver) = common::build_node_pair(&log).await;
// Dummy STATUS RPC message
let rpc_request = Request::Status(StatusMessage {
fork_digest: [0; 4],
finalized_root: Hash256::from_low_u64_be(0),
finalized_epoch: Epoch::new(1),
head_root: Hash256::from_low_u64_be(0),
head_slot: Slot::new(1),
});
// Dummy STATUS RPC message
let rpc_response = Response::Status(StatusMessage {
fork_digest: [0; 4],
finalized_root: Hash256::from_low_u64_be(0),
finalized_epoch: Epoch::new(1),
head_root: Hash256::from_low_u64_be(0),
head_slot: Slot::new(1),
});
// build the sender future
let sender_future = async {
loop {
match sender.next_event().await {
Libp2pEvent::PeerConnected { peer_id, .. } => {
// Send a STATUS message
debug!(log, "Sending RPC");
sender
.swarm
.send_request(peer_id, RequestId::Sync(10), rpc_request.clone());
}
Libp2pEvent::Behaviour(BehaviourEvent::ResponseReceived {
peer_id: _,
id: RequestId::Sync(10),
response,
}) => {
// Should receive the RPC response
debug!(log, "Sender Received");
assert_eq!(response, rpc_response.clone());
debug!(log, "Sender Completed");
return;
}
_ => {}
}
}
};
// build the receiver future
let receiver_future = async {
loop {
match receiver.next_event().await {
Libp2pEvent::Behaviour(BehaviourEvent::RequestReceived {
peer_id,
id,
request,
}) => {
if request == rpc_request {
// send the response
debug!(log, "Receiver Received");
receiver
.swarm
.send_successful_response(peer_id, id, rpc_response.clone());
}
}
_ => {} // Ignore other events
}
}
};
tokio::select! {
_ = sender_future => {}
_ = receiver_future => {}
_ = delay_for(Duration::from_secs(30)) => {
panic!("Future timed out");
}
}
}
#[tokio::test]
// Tests a streamed BlocksByRange RPC Message
async fn test_blocks_by_range_chunked_rpc() {
// set up the logging. The level and enabled logging or not
let log_level = Level::Trace;
let enable_logging = false;
let messages_to_send = 10;
let log = common::build_log(log_level, enable_logging);
// get sender/receiver
let (mut sender, mut receiver) = common::build_node_pair(&log).await;
// BlocksByRange Request
let rpc_request = Request::BlocksByRange(BlocksByRangeRequest {
start_slot: 0,
count: messages_to_send,
step: 0,
});
// BlocksByRange Response
let spec = E::default_spec();
let empty_block = BeaconBlock::empty(&spec);
let empty_signed = SignedBeaconBlock {
message: empty_block,
signature: Signature::empty_signature(),
};
let rpc_response = Response::BlocksByRange(Some(Box::new(empty_signed)));
// keep count of the number of messages received
let mut messages_received = 0;
// build the sender future
let sender_future = async {
loop {
match sender.next_event().await {
Libp2pEvent::PeerConnected { peer_id, .. } => {
// Send a STATUS message
debug!(log, "Sending RPC");
sender
.swarm
.send_request(peer_id, RequestId::Sync(10), rpc_request.clone());
}
Libp2pEvent::Behaviour(BehaviourEvent::ResponseReceived {
peer_id: _,
id: RequestId::Sync(10),
response,
}) => {
warn!(log, "Sender received a response");
match response {
Response::BlocksByRange(Some(_)) => {
assert_eq!(response, rpc_response.clone());
messages_received += 1;
warn!(log, "Chunk received");
}
Response::BlocksByRange(None) => {
// should be exactly 10 messages before terminating
assert_eq!(messages_received, messages_to_send);
// end the test
return;
}
_ => panic!("Invalid RPC received"),
}
}
_ => {} // Ignore other behaviour events
}
}
};
// build the receiver future
let receiver_future = async {
loop {
match receiver.next_event().await {
Libp2pEvent::Behaviour(BehaviourEvent::RequestReceived {
peer_id,
id,
request,
}) => {
if request == rpc_request {
// send the response
warn!(log, "Receiver got request");
for _ in 1..=messages_to_send {
receiver.swarm.send_successful_response(
peer_id.clone(),
id,
rpc_response.clone(),
);
}
// send the stream termination
receiver.swarm.send_successful_response(
peer_id,
id,
Response::BlocksByRange(None),
);
}
}
_ => {} // Ignore other events
}
}
};
tokio::select! {
_ = sender_future => {}
_ = receiver_future => {}
_ = delay_for(Duration::from_secs(30)) => {
panic!("Future timed out");
}
}
}
#[tokio::test]
// Tests that a streamed BlocksByRange RPC Message terminates when all expected chunks were received
async fn test_blocks_by_range_chunked_rpc_terminates_correctly() {
// set up the logging. The level and enabled logging or not
let log_level = Level::Debug;
let enable_logging = false;
let messages_to_send = 10;
let extra_messages_to_send = 10;
let log = common::build_log(log_level, enable_logging);
// get sender/receiver
let (mut sender, mut receiver) = common::build_node_pair(&log).await;
// BlocksByRange Request
let rpc_request = Request::BlocksByRange(BlocksByRangeRequest {
start_slot: 0,
count: messages_to_send,
step: 0,
});
// BlocksByRange Response
let spec = E::default_spec();
let empty_block = BeaconBlock::empty(&spec);
let empty_signed = SignedBeaconBlock {
message: empty_block,
signature: Signature::empty_signature(),
};
let rpc_response = Response::BlocksByRange(Some(Box::new(empty_signed)));
// keep count of the number of messages received
let mut messages_received: u64 = 0;
// build the sender future
let sender_future = async {
loop {
match sender.next_event().await {
Libp2pEvent::PeerConnected { peer_id, .. } => {
// Send a STATUS message
debug!(log, "Sending RPC");
sender
.swarm
.send_request(peer_id, RequestId::Sync(10), rpc_request.clone());
}
Libp2pEvent::Behaviour(BehaviourEvent::ResponseReceived {
peer_id: _,
id: RequestId::Sync(10),
response,
}) =>
// Should receive the RPC response
{
debug!(log, "Sender received a response");
match response {
Response::BlocksByRange(Some(_)) => {
assert_eq!(response, rpc_response.clone());
messages_received += 1;
}
Response::BlocksByRange(None) => {
// should be exactly 10 messages, as requested
assert_eq!(messages_received, messages_to_send);
}
_ => panic!("Invalid RPC received"),
}
}
_ => {} // Ignore other behaviour events
}
}
};
// determine messages to send (PeerId, RequestId). If some, indicates we still need to send
// messages
let mut message_info = None;
// the number of messages we've sent
let mut messages_sent = 0;
let receiver_future = async {
loop {
// this future either drives the sending/receiving or times out allowing messages to be
// sent in the timeout
match futures::future::select(
Box::pin(receiver.next_event()),
tokio::time::delay_for(Duration::from_secs(1)),
)
.await
{
futures::future::Either::Left((
Libp2pEvent::Behaviour(BehaviourEvent::RequestReceived {
peer_id,
id,
request,
}),
_,
)) => {
if request == rpc_request {
// send the response
warn!(log, "Receiver got request");
message_info = Some((peer_id, id));
}
}
futures::future::Either::Right((_, _)) => {} // The timeout hit, send messages if required
_ => continue,
}
// if we need to send messages send them here. This will happen after a delay
if message_info.is_some() {
messages_sent += 1;
let (peer_id, stream_id) = message_info.as_ref().unwrap();
receiver.swarm.send_successful_response(
peer_id.clone(),
stream_id.clone(),
rpc_response.clone(),
);
debug!(log, "Sending message {}", messages_sent);
if messages_sent == messages_to_send + extra_messages_to_send {
// stop sending messages
return;
}
}
}
};
tokio::select! {
_ = sender_future => {}
_ = receiver_future => {}
_ = delay_for(Duration::from_secs(30)) => {
panic!("Future timed out");
}
}
}
#[tokio::test]
// Tests an empty response to a BlocksByRange RPC Message
async fn test_blocks_by_range_single_empty_rpc() {
// set up the logging. The level and enabled logging or not
let log_level = Level::Trace;
let enable_logging = false;
let log = common::build_log(log_level, enable_logging);
// get sender/receiver
let (mut sender, mut receiver) = common::build_node_pair(&log).await;
// BlocksByRange Request
let rpc_request = Request::BlocksByRange(BlocksByRangeRequest {
start_slot: 0,
count: 10,
step: 0,
});
// BlocksByRange Response
let spec = E::default_spec();
let empty_block = BeaconBlock::empty(&spec);
let empty_signed = SignedBeaconBlock {
message: empty_block,
signature: Signature::empty_signature(),
};
let rpc_response = Response::BlocksByRange(Some(Box::new(empty_signed)));
let messages_to_send = 1;
// keep count of the number of messages received
let mut messages_received = 0;
// build the sender future
let sender_future = async {
loop {
match sender.next_event().await {
Libp2pEvent::PeerConnected { peer_id, .. } => {
// Send a STATUS message
debug!(log, "Sending RPC");
sender
.swarm
.send_request(peer_id, RequestId::Sync(10), rpc_request.clone());
}
Libp2pEvent::Behaviour(BehaviourEvent::ResponseReceived {
peer_id: _,
id: RequestId::Sync(10),
response,
}) => match response {
Response::BlocksByRange(Some(_)) => {
assert_eq!(response, rpc_response.clone());
messages_received += 1;
warn!(log, "Chunk received");
}
Response::BlocksByRange(None) => {
// should be exactly 10 messages before terminating
assert_eq!(messages_received, messages_to_send);
// end the test
return;
}
_ => panic!("Invalid RPC received"),
},
_ => {} // Ignore other behaviour events
}
}
};
// build the receiver future
let receiver_future = async {
loop {
match receiver.next_event().await {
Libp2pEvent::Behaviour(BehaviourEvent::RequestReceived {
peer_id,
id,
request,
}) => {
if request == rpc_request {
// send the response
warn!(log, "Receiver got request");
for _ in 1..=messages_to_send {
receiver.swarm.send_successful_response(
peer_id.clone(),
id,
rpc_response.clone(),
);
}
// send the stream termination
receiver.swarm.send_successful_response(
peer_id,
id,
Response::BlocksByRange(None),
);
}
}
_ => {} // Ignore other events
}
}
};
tokio::select! {
_ = sender_future => {}
_ = receiver_future => {}
_ = delay_for(Duration::from_secs(20)) => {
panic!("Future timed out");
}
}
}
#[tokio::test]
// Tests a streamed, chunked BlocksByRoot RPC Message
// The size of the reponse is a full `BeaconBlock`
// which is greater than the Snappy frame size. Hence, this test
// serves to test the snappy framing format as well.
async fn test_blocks_by_root_chunked_rpc() {
// set up the logging. The level and enabled logging or not
let log_level = Level::Debug;
let enable_logging = false;
let messages_to_send = 3;
let log = common::build_log(log_level, enable_logging);
let spec = E::default_spec();
// get sender/receiver
let (mut sender, mut receiver) = common::build_node_pair(&log).await;
// BlocksByRoot Request
let rpc_request = Request::BlocksByRoot(BlocksByRootRequest {
block_roots: VariableList::from(vec![
Hash256::from_low_u64_be(0),
Hash256::from_low_u64_be(0),
Hash256::from_low_u64_be(0),
]),
});
// BlocksByRoot Response
let full_block = BeaconBlock::full(&spec);
let signed_full_block = SignedBeaconBlock {
message: full_block,
signature: Signature::empty_signature(),
};
let rpc_response = Response::BlocksByRoot(Some(Box::new(signed_full_block)));
// keep count of the number of messages received
let mut messages_received = 0;
// build the sender future
let sender_future = async {
loop {
match sender.next_event().await {
Libp2pEvent::PeerConnected { peer_id, .. } => {
// Send a STATUS message
debug!(log, "Sending RPC");
sender
.swarm
.send_request(peer_id, RequestId::Sync(10), rpc_request.clone());
}
Libp2pEvent::Behaviour(BehaviourEvent::ResponseReceived {
peer_id: _,
id: RequestId::Sync(10),
response,
}) => match response {
Response::BlocksByRoot(Some(_)) => {
assert_eq!(response, rpc_response.clone());
messages_received += 1;
debug!(log, "Chunk received");
}
Response::BlocksByRoot(None) => {
// should be exactly messages_to_send
assert_eq!(messages_received, messages_to_send);
// end the test
return;
}
_ => {} // Ignore other RPC messages
},
_ => {} // Ignore other behaviour events
}
}
};
// build the receiver future
let receiver_future = async {
loop {
match receiver.next_event().await {
Libp2pEvent::Behaviour(BehaviourEvent::RequestReceived {
peer_id,
id,
request,
}) => {
if request == rpc_request {
// send the response
debug!(log, "Receiver got request");
for _ in 1..=messages_to_send {
receiver.swarm.send_successful_response(
peer_id.clone(),
id,
rpc_response.clone(),
);
debug!(log, "Sending message");
}
// send the stream termination
receiver.swarm.send_successful_response(
peer_id,
id,
Response::BlocksByRange(None),
);
debug!(log, "Send stream term");
}
}
_ => {} // Ignore other events
}
}
};
tokio::select! {
_ = sender_future => {}
_ = receiver_future => {}
_ = delay_for(Duration::from_secs(30)) => {
panic!("Future timed out");
}
}
}
#[tokio::test]
// Tests a streamed, chunked BlocksByRoot RPC Message terminates when all expected reponses have been received
async fn test_blocks_by_root_chunked_rpc_terminates_correctly() {
// set up the logging. The level and enabled logging or not
let log_level = Level::Debug;
let enable_logging = false;
let messages_to_send: u64 = 10;
let extra_messages_to_send: u64 = 10;
let log = common::build_log(log_level, enable_logging);
let spec = E::default_spec();
// get sender/receiver
let (mut sender, mut receiver) = common::build_node_pair(&log).await;
// BlocksByRoot Request
let rpc_request = Request::BlocksByRoot(BlocksByRootRequest {
block_roots: VariableList::from(vec![
Hash256::from_low_u64_be(0),
Hash256::from_low_u64_be(0),
Hash256::from_low_u64_be(0),
Hash256::from_low_u64_be(0),
Hash256::from_low_u64_be(0),
Hash256::from_low_u64_be(0),
Hash256::from_low_u64_be(0),
Hash256::from_low_u64_be(0),
Hash256::from_low_u64_be(0),
Hash256::from_low_u64_be(0),
]),
});
// BlocksByRoot Response
let full_block = BeaconBlock::full(&spec);
let signed_full_block = SignedBeaconBlock {
message: full_block,
signature: Signature::empty_signature(),
};
let rpc_response = Response::BlocksByRoot(Some(Box::new(signed_full_block)));
// keep count of the number of messages received
let mut messages_received = 0;
// build the sender future
let sender_future = async {
loop {
match sender.next_event().await {
Libp2pEvent::PeerConnected { peer_id, .. } => {
// Send a STATUS message
debug!(log, "Sending RPC");
sender
.swarm
.send_request(peer_id, RequestId::Sync(10), rpc_request.clone());
}
Libp2pEvent::Behaviour(BehaviourEvent::ResponseReceived {
peer_id: _,
id: RequestId::Sync(10),
response,
}) => {
debug!(log, "Sender received a response");
match response {
Response::BlocksByRoot(Some(_)) => {
assert_eq!(response, rpc_response.clone());
messages_received += 1;
debug!(log, "Chunk received");
}
Response::BlocksByRoot(None) => {
// should be exactly messages_to_send
assert_eq!(messages_received, messages_to_send);
// end the test
return;
}
_ => {} // Ignore other RPC messages
}
}
_ => {} // Ignore other behaviour events
}
}
};
// determine messages to send (PeerId, RequestId). If some, indicates we still need to send
// messages
let mut message_info = None;
// the number of messages we've sent
let mut messages_sent = 0;
let receiver_future = async {
loop {
// this future either drives the sending/receiving or times out allowing messages to be
// sent in the timeout
match futures::future::select(
Box::pin(receiver.next_event()),
tokio::time::delay_for(Duration::from_millis(1000)),
)
.await
{
futures::future::Either::Left((
Libp2pEvent::Behaviour(BehaviourEvent::RequestReceived {
peer_id,
id,
request,
}),
_,
)) => {
if request == rpc_request {
// send the response
warn!(log, "Receiver got request");
message_info = Some((peer_id, id));
}
}
futures::future::Either::Right((_, _)) => {} // The timeout hit, send messages if required
_ => continue,
}
// if we need to send messages send them here. This will happen after a delay
if message_info.is_some() {
messages_sent += 1;
let (peer_id, stream_id) = message_info.as_ref().unwrap();
receiver.swarm.send_successful_response(
peer_id.clone(),
stream_id.clone(),
rpc_response.clone(),
);
debug!(log, "Sending message {}", messages_sent);
if messages_sent == messages_to_send + extra_messages_to_send {
// stop sending messages
return;
}
}
}
};
tokio::select! {
_ = sender_future => {}
_ = receiver_future => {}
_ = delay_for(Duration::from_secs(30)) => {
panic!("Future timed out");
}
}
}
#[tokio::test]
// Tests a Goodbye RPC message
async fn test_goodbye_rpc() {
// set up the logging. The level and enabled logging or not
let log_level = Level::Trace;
let enable_logging = false;
let log = common::build_log(log_level, enable_logging);
// get sender/receiver
let (mut sender, mut receiver) = common::build_node_pair(&log).await;
// Goodbye Request
let rpc_request = Request::Goodbye(GoodbyeReason::ClientShutdown);
// build the sender future
let sender_future = async {
loop {
match sender.next_event().await {
Libp2pEvent::PeerConnected { peer_id, .. } => {
// Send a STATUS message
debug!(log, "Sending RPC");
sender
.swarm
.send_request(peer_id, RequestId::Sync(10), rpc_request.clone());
}
_ => {} // Ignore other RPC messages
}
}
};
// build the receiver future
let receiver_future = async {
loop {
match receiver.next_event().await {
Libp2pEvent::Behaviour(BehaviourEvent::RequestReceived {
peer_id: _,
id: _,
request,
}) => {
// Should receive sent RPC request
assert_eq!(rpc_request.clone(), request); // receives the goodbye. Nothing left to do
return;
}
_ => {} // Ignore other events
}
}
};
tokio::select! {
_ = sender_future => {}
_ = receiver_future => {}
_ = delay_for(Duration::from_secs(30)) => {
panic!("Future timed out");
}
}
}