Update libp2p service, start rpc test upgrade

This commit is contained in:
Age Manning
2020-05-11 15:24:53 +10:00
parent e4d9978fb7
commit 3ee4c4c60b
7 changed files with 179 additions and 306 deletions

View File

@@ -32,11 +32,17 @@ snap = "1.0.0"
void = "1.0.2"
tokio-io-timeout = "0.4.0"
tokio-util = { version = "0.3.1", features = ["codec", "compat"] }
libp2p = "0.18.1"
discv5 = "0.1.0-alpha.1"
tiny-keccak = "2.0.2"
[dependencies.libp2p]
version = "0.18.1"
default-features = false
features = ["websocket", "ping", "identify", "mplex", "yamux", "noise", "secio", "gossipsub", "tcp", "dns"]
[dev-dependencies]
tokio = { version = "0.2.20", features = ["full"] }
slog-stdlog = "4.0.0"
slog-term = "2.5.0"
slog-async = "2.5.0"

View File

@@ -12,8 +12,13 @@ use libp2p::core::{
muxing::StreamMuxerBox,
transport::boxed::Boxed,
upgrade::{InboundUpgradeExt, OutboundUpgradeExt},
ConnectedPoint,
};
use libp2p::{
core, noise, secio,
swarm::{NetworkBehaviour, SwarmEvent},
PeerId, Swarm, Transport,
};
use libp2p::{core, noise, secio, swarm::NetworkBehaviour, PeerId, Swarm, Transport};
use slog::{crit, debug, error, info, trace, warn};
use std::fs::File;
use std::io::prelude::*;
@@ -30,6 +35,23 @@ pub const NETWORK_KEY_FILENAME: &str = "key";
/// flushed and protocols to be negotiated.
const BAN_PEER_WAIT_TIMEOUT: u64 = 200;
/// The types of events than can be obtained from polling the libp2p service.
///
/// This is a subset of the events that a libp2p swarm emits.
#[derive(Debug)]
pub enum Libp2pEvent<TSpec: EthSpec> {
/// A behaviour event
Behaviour(BehaviourEvent<TSpec>),
/// A new listening address has been established.
NewListenAddr(Multiaddr),
/// A connection has been established with a peer.
ConnectionEstablished {
peer_id: PeerId,
endpoint: ConnectedPoint,
num_established: u32,
},
}
/// The configuration and state of the libp2p components for the beacon node.
pub struct Service<TSpec: EthSpec> {
/// The libp2p Swarm handler.
@@ -178,54 +200,100 @@ impl<TSpec: EthSpec> Service<TSpec> {
}
}
// TODO: Convert to an async function via building a stored stream from libp2p swarm
impl<TSpec: EthSpec> Stream for Service<TSpec> {
type Item = Result<BehaviourEvent<TSpec>, error::Error>;
type Item = Libp2pEvent<TSpec>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let log = self.log.clone();
loop {
match self.swarm.poll_next_unpin(cx) {
Poll::Ready(Some(event)) => {
return Poll::Ready(Some(Ok(event)));
}
Poll::Ready(None) => unreachable!("Swarm stream shouldn't end"),
// Process the next action coming from the network.
let libp2p_event = self.swarm.next_event();
futures::pin_mut!(libp2p_event);
let event = libp2p_event.poll_unpin(cx);
match event {
Poll::Pending => break,
}
}
// check if peers need to be banned
loop {
match self.peers_to_ban.poll_next_unpin(cx) {
Poll::Ready(Some(Ok(peer_id))) => {
let peer_id = peer_id.into_inner();
Swarm::ban_peer_id(&mut self.swarm, peer_id.clone());
// TODO: Correctly notify protocols of the disconnect
// TODO: Also remove peer from the DHT: https://github.com/sigp/lighthouse/issues/629
self.swarm.inject_disconnected(&peer_id);
// inform the behaviour that the peer has been banned
self.swarm.peer_banned(peer_id);
Poll::Ready(SwarmEvent::Behaviour(behaviour)) => {
return Poll::Ready(Some(Libp2pEvent::Behaviour(behaviour)))
}
Poll::Pending | Poll::Ready(None) => break,
Poll::Ready(Some(Err(e))) => {
warn!(self.log, "Peer banning queue failed"; "error" => format!("{:?}", e));
Poll::Ready(SwarmEvent::ConnectionEstablished {
peer_id,
endpoint,
num_established,
}) => {
return Poll::Ready(Some(Libp2pEvent::ConnectionEstablished {
peer_id,
endpoint,
num_established: num_established.get(),
}))
}
Poll::Ready(SwarmEvent::NewListenAddr(multiaddr)) => {
return Poll::Ready(Some(Libp2pEvent::NewListenAddr(multiaddr)))
}
Poll::Ready(SwarmEvent::ConnectionClosed { peer_id, cause, .. }) => {
debug!(log, "Connection closed"; "peer_id"=> peer_id.to_string(), "cause" => cause.to_string());
}
Poll::Ready(SwarmEvent::IncomingConnection {
local_addr,
send_back_addr,
}) => {
debug!(log, "Incoming connection"; "our_addr" => local_addr.to_string(), "from" => send_back_addr.to_string())
}
Poll::Ready(SwarmEvent::IncomingConnectionError {
local_addr,
send_back_addr,
error,
}) => {
debug!(log, "Failed incoming connection"; "our_addr" => local_addr.to_string(), "from" => send_back_addr.to_string(), "error" => error.to_string())
}
Poll::Ready(SwarmEvent::BannedPeer {
peer_id,
endpoint: _,
}) => {
debug!(log, "Attempted to dial a banned peer"; "peer_id" => peer_id.to_string())
}
Poll::Ready(SwarmEvent::UnreachableAddr {
peer_id,
address,
error,
attempts_remaining,
}) => {
debug!(log, "Failed to dial address"; "peer_id" => peer_id.to_string(), "address" => address.to_string(), "error" => error.to_string(), "attempts_remaining" => attempts_remaining)
}
Poll::Ready(SwarmEvent::UnknownPeerUnreachableAddr { address, .. }) => {
debug!(log, "Peer not known at dialed address"; "address" => address.to_string())
}
Poll::Ready(SwarmEvent::ExpiredListenAddr(multiaddr)) => {
debug!(log, "Listen address expired"; "multiaddr" => multiaddr.to_string())
}
Poll::Ready(SwarmEvent::ListenerClosed { addresses, reason }) => {
debug!(log, "Listener closed"; "addresses" => format!("{:?}", addresses), "reason" => format!("{:?}", reason))
}
Poll::Ready(SwarmEvent::ListenerError { error }) => {
debug!(log, "Listener error"; "error" => format!("{:?}", error.to_string()))
}
Poll::Ready(SwarmEvent::Dialing(peer_id)) => {
trace!(log, "Dialing peer"; "peer" => peer_id.to_string());
}
}
}
// un-ban peer if it's timeout has expired
loop {
match self.peer_ban_timeout.poll_next_unpin(cx) {
Poll::Ready(Some(Ok(peer_id))) => {
let peer_id = peer_id.into_inner();
debug!(self.log, "Peer has been unbanned"; "peer" => format!("{:?}", peer_id));
self.swarm.peer_unbanned(&peer_id);
Swarm::unban_peer_id(&mut self.swarm, peer_id);
}
Poll::Pending | Poll::Ready(None) => break,
Poll::Ready(Some(Err(e))) => {
warn!(self.log, "Peer banning timeout queue failed"; "error" => format!("{:?}", e));
}
}
while let Poll::Ready(Some(Ok(peer_to_ban))) = self.peers_to_ban.poll_next_unpin(cx) {
let peer_id = peer_to_ban.into_inner();
Swarm::ban_peer_id(&mut self.swarm, peer_id.clone());
// TODO: Correctly notify protocols of the disconnect
// TODO: Also remove peer from the DHT: https://github.com/sigp/lighthouse/issues/629
self.swarm.inject_disconnected(&peer_id);
// inform the behaviour that the peer has been banned
self.swarm.peer_banned(peer_id);
}
while let Poll::Ready(Some(Ok(peer_to_unban))) = self.peer_ban_timeout.poll_next_unpin(cx) {
debug!(self.log, "Peer has been unbanned"; "peer" => format!("{:?}", peer_to_unban));
let unban_peer = peer_to_unban.into_inner();
self.swarm.peer_unbanned(&unban_peer);
Swarm::unban_peer_id(&mut self.swarm, unban_peer);
}
Poll::Pending

View File

@@ -1,5 +1,6 @@
#![cfg(test)]
use eth2_libp2p::Enr;
use eth2_libp2p::EnrExt;
use eth2_libp2p::Multiaddr;
use eth2_libp2p::NetworkConfig;
use eth2_libp2p::Service as LibP2PService;
@@ -93,7 +94,6 @@ pub fn build_libp2p_instance(
#[allow(dead_code)]
pub fn get_enr(node: &LibP2PService<E>) -> Enr {
let enr = node.swarm.discovery().local_enr().clone();
dbg!(enr.multiaddr());
enr
}
@@ -121,7 +121,7 @@ pub fn build_full_mesh(log: slog::Logger, n: usize) -> Vec<LibP2PService<E>> {
nodes
}
// Constructs a pair of nodes with seperate loggers. The sender dials the receiver.
// Constructs a pair of nodes with separate loggers. The sender dials the receiver.
// This returns a (sender, receiver) pair.
#[allow(dead_code)]
pub fn build_node_pair(log: &slog::Logger) -> (LibP2PService<E>, LibP2PService<E>) {
@@ -132,8 +132,10 @@ pub fn build_node_pair(log: &slog::Logger) -> (LibP2PService<E>, LibP2PService<E
let receiver = build_libp2p_instance(vec![], None, receiver_log);
let receiver_multiaddr = receiver.swarm.discovery().local_enr().clone().multiaddr()[1].clone();
match libp2p::Swarm::dial_addr(&mut sender.swarm, receiver_multiaddr) {
Ok(()) => debug!(log, "Sender dialed receiver"),
match libp2p::Swarm::dial_addr(&mut sender.swarm, receiver_multiaddr.clone()) {
Ok(()) => {
debug!(log, "Sender dialed receiver"; "address" => format!("{:?}", receiver_multiaddr))
}
Err(_) => error!(log, "Dialing failed"),
};
(sender, receiver)

View File

@@ -1,3 +1,4 @@
/*
#![cfg(test)]
use crate::types::GossipEncoding;
use ::types::{BeaconBlock, EthSpec, MinimalEthSpec, Signature, SignedBeaconBlock};
@@ -149,3 +150,4 @@ fn test_gossipsub_full_mesh_publish() {
Ok(Async::NotReady)
}))
}
*/

View File

@@ -1,3 +1,4 @@
/*
#![cfg(test)]
use crate::behaviour::{Behaviour, BehaviourEvent};
use crate::multiaddr::Protocol;
@@ -179,3 +180,4 @@ fn test_secio_noise_fallback() {
);
assert!(test_result.load(Relaxed));
}
*/

View File

@@ -2,11 +2,12 @@
use eth2_libp2p::rpc::methods::*;
use eth2_libp2p::rpc::*;
use eth2_libp2p::{BehaviourEvent, RPCEvent};
use slog::{warn, Level};
use std::sync::atomic::{AtomicBool, Ordering::Relaxed};
use futures::prelude::*;
use slog::{debug, error, warn, Level};
use std::sync::{Arc, Mutex};
use std::time::Duration;
use tokio::prelude::*;
use tokio::time::delay_for;
use types::{
BeaconBlock, Epoch, EthSpec, Hash256, MinimalEthSpec, Signature, SignedBeaconBlock, Slot,
};
@@ -15,12 +16,12 @@ mod common;
type E = MinimalEthSpec;
#[test]
#[tokio::test]
// Tests the STATUS RPC message
fn test_status_rpc() {
async fn test_status_rpc() {
// set up the logging. The level and enabled logging or not
let log_level = Level::Trace;
let enable_logging = false;
let log_level = Level::Debug;
let enable_logging = true;
let log = common::build_log(log_level, enable_logging);
@@ -45,56 +46,52 @@ fn test_status_rpc() {
head_slot: Slot::new(1),
});
let sender_request = rpc_request.clone();
let sender_log = log.clone();
let sender_response = rpc_response.clone();
// build the sender future
let sender_future = future::poll_fn(move || -> Poll<bool, ()> {
loop {
match sender.poll().unwrap() {
Async::Ready(Some(BehaviourEvent::PeerDialed(peer_id))) => {
let sender_future = async {
while let Some(sender_event) = sender.next().await {
match sender_event {
Ok(BehaviourEvent::PeerDialed(peer_id)) => {
// Send a STATUS message
warn!(sender_log, "Sending RPC");
debug!(log, "Sending RPC");
sender
.swarm
.send_rpc(peer_id, RPCEvent::Request(1, sender_request.clone()));
.send_rpc(peer_id, RPCEvent::Request(1, rpc_request.clone()));
}
Async::Ready(Some(BehaviourEvent::RPC(_, event))) => match event {
Ok(BehaviourEvent::RPC(_, event)) => match event {
// Should receive the RPC response
RPCEvent::Response(id, response @ RPCCodedResponse::Success(_)) => {
if id == 1 {
warn!(sender_log, "Sender Received");
debug!(log, "Sender Received");
let response = {
match response {
RPCCodedResponse::Success(r) => r,
_ => unreachable!(),
}
};
assert_eq!(response, sender_response.clone());
assert_eq!(response, rpc_response.clone());
warn!(sender_log, "Sender Completed");
return Ok(Async::Ready(true));
debug!(log, "Sender Completed");
return;
}
}
e => panic!("Received invalid RPC message {}", e),
},
Async::Ready(Some(_)) => (),
Async::Ready(None) | Async::NotReady => return Ok(Async::NotReady),
};
Err(e) => error!(log, "Error in sender: {}", e),
x => debug!(log, "Event:"; "e:"=> format!("{:?}",x)), // ignore other events
}
}
});
};
// build the receiver future
let receiver_future = future::poll_fn(move || -> Poll<bool, ()> {
loop {
match receiver.poll().unwrap() {
Async::Ready(Some(BehaviourEvent::RPC(peer_id, event))) => match event {
let receiver_future = async {
while let Some(recv_event) = receiver.next().await {
match recv_event {
Ok(BehaviourEvent::RPC(peer_id, event)) => match event {
// Should receive sent RPC request
RPCEvent::Request(id, request) => {
if request == rpc_request {
// send the response
warn!(log, "Receiver Received");
debug!(log, "Receiver Received");
receiver.swarm.send_rpc(
peer_id,
RPCEvent::Response(
@@ -106,28 +103,22 @@ fn test_status_rpc() {
}
e => panic!("Received invalid RPC message {}", e),
},
Async::Ready(Some(_)) => (),
Async::Ready(None) | Async::NotReady => return Ok(Async::NotReady),
Err(e) => error!(log, "Error in sender: {}", e),
_ => {} // ignore other events
}
}
});
};
// execute the futures and check the result
let test_result = Arc::new(AtomicBool::new(false));
let error_result = test_result.clone();
let thread_result = test_result.clone();
tokio::run(
sender_future
.select(receiver_future)
.timeout(Duration::from_millis(1000))
.map_err(move |_| error_result.store(false, Relaxed))
.map(move |result| {
thread_result.store(result.0, Relaxed);
}),
);
assert!(test_result.load(Relaxed));
tokio::select! {
_ = sender_future => {}
_ = receiver_future => {}
_ = delay_for(Duration::from_millis(2800)) => {
panic!("Future timed out");
}
}
}
/*
#[test]
// Tests a streamed BlocksByRange RPC Message
fn test_blocks_by_range_chunked_rpc() {
@@ -601,3 +592,4 @@ fn test_goodbye_rpc() {
);
assert!(test_result.load(Relaxed));
}
*/