Merge branch 'master' into paul-sync

This commit is contained in:
Paul Hauner
2019-03-22 07:11:04 +11:00
62 changed files with 1506 additions and 1386 deletions

View File

@@ -0,0 +1,18 @@
[package]
name = "eth2-libp2p"
version = "0.1.0"
authors = ["Age Manning <Age@AgeManning.com>"]
edition = "2018"
[dependencies]
beacon_chain = { path = "../beacon_chain" }
# SigP repository until PR is merged
libp2p = { git = "https://github.com/SigP/rust-libp2p", branch = "gossipsub" }
types = { path = "../../eth2/types" }
ssz = { path = "../../eth2/utils/ssz" }
ssz_derive = { path = "../../eth2/utils/ssz_derive" }
slog = "2.4.1"
version = { path = "../version" }
tokio = "0.1.16"
futures = "0.1.25"
error-chain = "0.12.0"

View File

@@ -0,0 +1,96 @@
use crate::rpc::{RPCEvent, RPCMessage, Rpc};
use futures::prelude::*;
use libp2p::{
core::swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess},
gossipsub::{Gossipsub, GossipsubConfig, GossipsubEvent},
tokio_io::{AsyncRead, AsyncWrite},
NetworkBehaviour, PeerId,
};
use types::Topic;
/// Builds the network behaviour for the libp2p Swarm.
/// Implements gossipsub message routing.
#[derive(NetworkBehaviour)]
#[behaviour(out_event = "BehaviourEvent", poll_method = "poll")]
pub struct Behaviour<TSubstream: AsyncRead + AsyncWrite> {
gossipsub: Gossipsub<TSubstream>,
// TODO: Add Kademlia for peer discovery
/// The events generated by this behaviour to be consumed in the swarm poll.
serenity_rpc: Rpc<TSubstream>,
#[behaviour(ignore)]
events: Vec<BehaviourEvent>,
}
// Implement the NetworkBehaviourEventProcess trait so that we can derive NetworkBehaviour for Behaviour
impl<TSubstream: AsyncRead + AsyncWrite> NetworkBehaviourEventProcess<GossipsubEvent>
for Behaviour<TSubstream>
{
fn inject_event(&mut self, event: GossipsubEvent) {
match event {
GossipsubEvent::Message(message) => {
let gs_message = String::from_utf8_lossy(&message.data);
// TODO: Remove this type - debug only
self.events
.push(BehaviourEvent::Message(gs_message.to_string()))
}
_ => {}
}
}
}
impl<TSubstream: AsyncRead + AsyncWrite> NetworkBehaviourEventProcess<RPCMessage>
for Behaviour<TSubstream>
{
fn inject_event(&mut self, event: RPCMessage) {
match event {
RPCMessage::PeerDialed(peer_id) => {
self.events.push(BehaviourEvent::PeerDialed(peer_id))
}
RPCMessage::RPC(peer_id, rpc_event) => {
self.events.push(BehaviourEvent::RPC(peer_id, rpc_event))
}
}
}
}
impl<TSubstream: AsyncRead + AsyncWrite> Behaviour<TSubstream> {
pub fn new(local_peer_id: PeerId, gs_config: GossipsubConfig, log: &slog::Logger) -> Self {
Behaviour {
gossipsub: Gossipsub::new(local_peer_id, gs_config),
serenity_rpc: Rpc::new(log),
events: Vec::new(),
}
}
/// Consumes the events list when polled.
fn poll<TBehaviourIn>(
&mut self,
) -> Async<NetworkBehaviourAction<TBehaviourIn, BehaviourEvent>> {
if !self.events.is_empty() {
return Async::Ready(NetworkBehaviourAction::GenerateEvent(self.events.remove(0)));
}
Async::NotReady
}
}
/// Implements the combined behaviour for the libp2p service.
impl<TSubstream: AsyncRead + AsyncWrite> Behaviour<TSubstream> {
/// Subscribes to a gossipsub topic.
pub fn subscribe(&mut self, topic: Topic) -> bool {
self.gossipsub.subscribe(topic)
}
/// Sends an RPC Request/Response via the RPC protocol.
pub fn send_rpc(&mut self, peer_id: PeerId, rpc_event: RPCEvent) {
self.serenity_rpc.send_rpc(peer_id, rpc_event);
}
}
/// The types of events than can be obtained from polling the behaviour.
pub enum BehaviourEvent {
RPC(PeerId, RPCEvent),
PeerDialed(PeerId),
// TODO: This is a stub at the moment
Message(String),
}

View File

@@ -0,0 +1,8 @@
// generates error types
use error_chain::{
error_chain, error_chain_processing, impl_error_chain_kind, impl_error_chain_processed,
impl_extract_backtrace,
};
error_chain! {}

View File

@@ -0,0 +1,20 @@
/// This crate contains the main link for lighthouse to rust-libp2p. It therefore re-exports
/// all required libp2p functionality.
///
/// This crate builds and manages the libp2p services required by the beacon node.
pub mod behaviour;
pub mod error;
mod network_config;
pub mod rpc;
mod service;
pub use libp2p::{
gossipsub::{GossipsubConfig, GossipsubConfigBuilder},
PeerId,
};
pub use network_config::NetworkConfig;
pub use rpc::{HelloMessage, RPCEvent};
pub use service::Libp2pEvent;
pub use service::Service;
pub use types::multiaddr;
pub use types::Multiaddr;

View File

@@ -0,0 +1,59 @@
use crate::Multiaddr;
use libp2p::gossipsub::{GossipsubConfig, GossipsubConfigBuilder};
use libp2p::secio;
use std::fmt;
#[derive(Clone)]
/// Network configuration for lighthouse.
pub struct NetworkConfig {
//TODO: stubbing networking initial params, change in the future
/// IP address to listen on.
pub listen_addresses: Vec<Multiaddr>,
/// Listen port UDP/TCP.
pub listen_port: u16,
/// Gossipsub configuration parameters.
pub gs_config: GossipsubConfig,
/// List of nodes to initially connect to.
pub boot_nodes: Vec<Multiaddr>,
/// Peer key related to this nodes PeerId.
pub local_private_key: secio::SecioKeyPair,
/// Client version
pub client_version: String,
/// List of topics to subscribe to as strings
pub topics: Vec<String>,
}
impl Default for NetworkConfig {
/// Generate a default network configuration.
fn default() -> Self {
// TODO: Currently using secp256k1 key pairs. Wire protocol specifies RSA. Waiting for this
// PR to be merged to generate RSA keys: https://github.com/briansmith/ring/pull/733
NetworkConfig {
listen_addresses: vec!["/ip4/127.0.0.1/tcp/9000"
.parse()
.expect("is a correct multi-address")],
listen_port: 9000,
gs_config: GossipsubConfigBuilder::new().build(),
boot_nodes: Vec::new(),
local_private_key: secio::SecioKeyPair::secp256k1_generated().unwrap(),
client_version: version::version(),
topics: vec![String::from("beacon_chain")],
}
}
}
impl NetworkConfig {
pub fn new(boot_nodes: Vec<Multiaddr>) -> Self {
let mut conf = NetworkConfig::default();
conf.boot_nodes = boot_nodes;
conf
}
}
impl fmt::Debug for NetworkConfig {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "NetworkConfig: listen_addresses: {:?}, listen_port: {:?}, gs_config: {:?}, boot_nodes: {:?}, local_private_key: <Secio-PubKey {:?}>, client_version: {:?}", self.listen_addresses, self.listen_port, self.gs_config, self.boot_nodes, self.local_private_key.to_public_key(), self.client_version)
}
}

View File

@@ -0,0 +1,188 @@
/// Available RPC methods types and ids.
use ssz_derive::{Decode, Encode};
use types::{BeaconBlockBody, BeaconBlockHeader, Epoch, Hash256, Slot};
#[derive(Debug)]
/// Available Serenity Libp2p RPC methods
pub enum RPCMethod {
/// Initialise handshake between connecting peers.
Hello,
/// Terminate a connection providing a reason.
Goodbye,
/// Requests a number of beacon block roots.
BeaconBlockRoots,
/// Requests a number of beacon block headers.
BeaconBlockHeaders,
/// Requests a number of beacon block bodies.
BeaconBlockBodies,
/// Requests values for a merkle proof for the current blocks state root.
BeaconChainState, // Note: experimental, not complete.
/// Unknown method received.
Unknown,
}
impl From<u16> for RPCMethod {
fn from(method_id: u16) -> Self {
match method_id {
0 => RPCMethod::Hello,
1 => RPCMethod::Goodbye,
10 => RPCMethod::BeaconBlockRoots,
11 => RPCMethod::BeaconBlockHeaders,
12 => RPCMethod::BeaconBlockBodies,
13 => RPCMethod::BeaconChainState,
_ => RPCMethod::Unknown,
}
}
}
impl Into<u16> for RPCMethod {
fn into(self) -> u16 {
match self {
RPCMethod::Hello => 0,
RPCMethod::Goodbye => 1,
RPCMethod::BeaconBlockRoots => 10,
RPCMethod::BeaconBlockHeaders => 11,
RPCMethod::BeaconBlockBodies => 12,
RPCMethod::BeaconChainState => 13,
_ => 0,
}
}
}
#[derive(Debug, Clone)]
pub enum RPCRequest {
Hello(HelloMessage),
Goodbye(u64),
BeaconBlockRoots(BeaconBlockRootsRequest),
BeaconBlockHeaders(BeaconBlockHeadersRequest),
BeaconBlockBodies(BeaconBlockBodiesRequest),
BeaconChainState(BeaconChainStateRequest),
}
impl RPCRequest {
pub fn method_id(&self) -> u16 {
let method = match self {
RPCRequest::Hello(_) => RPCMethod::Hello,
RPCRequest::Goodbye(_) => RPCMethod::Goodbye,
RPCRequest::BeaconBlockRoots(_) => RPCMethod::BeaconBlockRoots,
RPCRequest::BeaconBlockHeaders(_) => RPCMethod::BeaconBlockHeaders,
RPCRequest::BeaconBlockBodies(_) => RPCMethod::BeaconBlockBodies,
RPCRequest::BeaconChainState(_) => RPCMethod::BeaconChainState,
};
method.into()
}
}
#[derive(Debug, Clone)]
pub enum RPCResponse {
Hello(HelloMessage),
BeaconBlockRoots(BeaconBlockRootsResponse),
BeaconBlockHeaders(BeaconBlockHeadersResponse),
BeaconBlockBodies(BeaconBlockBodiesResponse),
BeaconChainState(BeaconChainStateResponse),
}
impl RPCResponse {
pub fn method_id(&self) -> u16 {
let method = match self {
RPCResponse::Hello(_) => RPCMethod::Hello,
RPCResponse::BeaconBlockRoots(_) => RPCMethod::BeaconBlockRoots,
RPCResponse::BeaconBlockHeaders(_) => RPCMethod::BeaconBlockHeaders,
RPCResponse::BeaconBlockBodies(_) => RPCMethod::BeaconBlockBodies,
RPCResponse::BeaconChainState(_) => RPCMethod::BeaconChainState,
};
method.into()
}
}
/* Request/Response data structures for RPC methods */
/// The HELLO request/response handshake message.
#[derive(Encode, Decode, Clone, Debug)]
pub struct HelloMessage {
/// The network ID of the peer.
pub network_id: u8,
/// The peers last finalized root.
pub latest_finalized_root: Hash256,
/// The peers last finalized epoch.
pub latest_finalized_epoch: Epoch,
/// The peers last block root.
pub best_root: Hash256,
/// The peers last slot.
pub best_slot: Slot,
}
/// Request a number of beacon block roots from a peer.
#[derive(Encode, Decode, Clone, Debug, PartialEq)]
pub struct BeaconBlockRootsRequest {
/// The starting slot of the requested blocks.
pub start_slot: Slot,
/// The number of blocks from the start slot.
pub count: u64, // this must be less than 32768. //TODO: Enforce this in the lower layers
}
/// Response containing a number of beacon block roots from a peer.
#[derive(Encode, Decode, Clone, Debug, PartialEq)]
pub struct BeaconBlockRootsResponse {
/// List of requested blocks and associated slots.
pub roots: Vec<BlockRootSlot>,
}
/// Contains a block root and associated slot.
#[derive(Encode, Decode, Clone, Debug, PartialEq)]
pub struct BlockRootSlot {
/// The block root.
pub block_root: Hash256,
/// The block slot.
pub slot: Slot,
}
/// Request a number of beacon block headers from a peer.
#[derive(Encode, Decode, Clone, Debug, PartialEq)]
pub struct BeaconBlockHeadersRequest {
/// The starting header hash of the requested headers.
pub start_root: Hash256,
/// The starting slot of the requested headers.
pub start_slot: Slot,
/// The maximum number of headers than can be returned.
pub max_headers: u64,
/// The maximum number of slots to skip between blocks.
pub skip_slots: u64,
}
/// Response containing requested block headers.
#[derive(Encode, Decode, Clone, Debug, PartialEq)]
pub struct BeaconBlockHeadersResponse {
/// The list of requested beacon block headers.
pub headers: Vec<BeaconBlockHeader>,
}
/// Request a number of beacon block bodies from a peer.
#[derive(Encode, Decode, Clone, Debug, PartialEq)]
pub struct BeaconBlockBodiesRequest {
/// The list of beacon block bodies being requested.
pub block_roots: Hash256,
}
/// Response containing the list of requested beacon block bodies.
#[derive(Encode, Decode, Clone, Debug, PartialEq)]
pub struct BeaconBlockBodiesResponse {
/// The list of beacon block bodies being requested.
pub block_bodies: Vec<BeaconBlockBody>,
}
/// Request values for tree hashes which yield a blocks `state_root`.
#[derive(Encode, Decode, Clone, Debug, PartialEq)]
pub struct BeaconChainStateRequest {
/// The tree hashes that a value is requested for.
pub hashes: Vec<Hash256>,
}
/// Request values for tree hashes which yield a blocks `state_root`.
// Note: TBD
#[derive(Encode, Decode, Clone, Debug, PartialEq)]
pub struct BeaconChainStateResponse {
/// The values corresponding the to the requested tree hashes.
pub values: bool, //TBD - stubbed with encodeable bool
}

View File

@@ -0,0 +1,138 @@
/// RPC Protocol over libp2p.
///
/// This is purpose built for Ethereum 2.0 serenity and the protocol listens on
/// `/eth/serenity/rpc/1.0.0`
pub mod methods;
mod protocol;
use futures::prelude::*;
use libp2p::core::protocols_handler::{OneShotHandler, ProtocolsHandler};
use libp2p::core::swarm::{
ConnectedPoint, NetworkBehaviour, NetworkBehaviourAction, PollParameters,
};
use libp2p::{Multiaddr, PeerId};
pub use methods::{HelloMessage, RPCMethod, RPCRequest, RPCResponse};
pub use protocol::{RPCEvent, RPCProtocol};
use slog::o;
use std::marker::PhantomData;
use tokio::io::{AsyncRead, AsyncWrite};
/// The network behaviour handles RPC requests/responses as specified in the Eth 2.0 phase 0
/// specification.
pub struct Rpc<TSubstream> {
/// Queue of events to processed.
events: Vec<NetworkBehaviourAction<RPCEvent, RPCMessage>>,
/// Pins the generic substream.
marker: PhantomData<TSubstream>,
/// Slog logger for RPC behaviour.
log: slog::Logger,
}
impl<TSubstream> Rpc<TSubstream> {
pub fn new(log: &slog::Logger) -> Self {
let log = log.new(o!("Service" => "Libp2p-RPC"));
Rpc {
events: Vec::new(),
marker: PhantomData,
log,
}
}
/// Submits and RPC request.
pub fn send_rpc(&mut self, peer_id: PeerId, rpc_event: RPCEvent) {
self.events.push(NetworkBehaviourAction::SendEvent {
peer_id,
event: rpc_event,
});
}
}
impl<TSubstream> NetworkBehaviour for Rpc<TSubstream>
where
TSubstream: AsyncRead + AsyncWrite,
{
type ProtocolsHandler = OneShotHandler<TSubstream, RPCProtocol, RPCEvent, OneShotEvent>;
type OutEvent = RPCMessage;
fn new_handler(&mut self) -> Self::ProtocolsHandler {
Default::default()
}
fn addresses_of_peer(&mut self, _peer_id: &PeerId) -> Vec<Multiaddr> {
Vec::new()
}
fn inject_connected(&mut self, peer_id: PeerId, connected_point: ConnectedPoint) {
// if initialised the connection, report this upwards to send the HELLO request
if let ConnectedPoint::Dialer { address: _ } = connected_point {
self.events.push(NetworkBehaviourAction::GenerateEvent(
RPCMessage::PeerDialed(peer_id),
));
}
}
fn inject_disconnected(&mut self, _: &PeerId, _: ConnectedPoint) {}
fn inject_node_event(
&mut self,
source: PeerId,
event: <Self::ProtocolsHandler as ProtocolsHandler>::OutEvent,
) {
// ignore successful send events
let event = match event {
OneShotEvent::Rx(event) => event,
OneShotEvent::Sent => return,
};
// send the event to the user
self.events
.push(NetworkBehaviourAction::GenerateEvent(RPCMessage::RPC(
source, event,
)));
}
fn poll(
&mut self,
_: &mut PollParameters<'_>,
) -> Async<
NetworkBehaviourAction<
<Self::ProtocolsHandler as ProtocolsHandler>::InEvent,
Self::OutEvent,
>,
> {
if !self.events.is_empty() {
return Async::Ready(self.events.remove(0));
}
Async::NotReady
}
}
/// Messages sent to the user from the RPC protocol.
pub enum RPCMessage {
RPC(PeerId, RPCEvent),
PeerDialed(PeerId),
}
/// Transmission between the `OneShotHandler` and the `RPCEvent`.
#[derive(Debug)]
pub enum OneShotEvent {
/// We received an RPC from a remote.
Rx(RPCEvent),
/// We successfully sent an RPC request.
Sent,
}
impl From<RPCEvent> for OneShotEvent {
#[inline]
fn from(rpc: RPCEvent) -> OneShotEvent {
OneShotEvent::Rx(rpc)
}
}
impl From<()> for OneShotEvent {
#[inline]
fn from(_: ()) -> OneShotEvent {
OneShotEvent::Sent
}
}

View File

@@ -0,0 +1,181 @@
use super::methods::{HelloMessage, RPCMethod, RPCRequest, RPCResponse};
use libp2p::core::{upgrade, InboundUpgrade, OutboundUpgrade, UpgradeInfo};
use ssz::{ssz_encode, Decodable, Encodable, SszStream};
use std::io;
use std::iter;
use tokio::io::{AsyncRead, AsyncWrite};
/// The maximum bytes that can be sent across the RPC.
const MAX_READ_SIZE: usize = 2048;
/// Implementation of the `ConnectionUpgrade` for the rpc protocol.
#[derive(Debug, Clone)]
pub struct RPCProtocol;
impl UpgradeInfo for RPCProtocol {
type Info = &'static [u8];
type InfoIter = iter::Once<Self::Info>;
#[inline]
fn protocol_info(&self) -> Self::InfoIter {
iter::once(b"/eth/serenity/rpc/1.0.0")
}
}
impl Default for RPCProtocol {
fn default() -> Self {
RPCProtocol
}
}
/// The RPC types which are sent/received in this protocol.
#[derive(Debug, Clone)]
pub enum RPCEvent {
Request {
id: u64,
method_id: u16,
body: RPCRequest,
},
Response {
id: u64,
method_id: u16, //TODO: Remove and process decoding upstream
result: RPCResponse,
},
}
impl UpgradeInfo for RPCEvent {
type Info = &'static [u8];
type InfoIter = iter::Once<Self::Info>;
#[inline]
fn protocol_info(&self) -> Self::InfoIter {
iter::once(b"/eth/serenity/rpc/1.0.0")
}
}
impl<TSocket> InboundUpgrade<TSocket> for RPCProtocol
where
TSocket: AsyncRead + AsyncWrite,
{
type Output = RPCEvent;
type Error = DecodeError;
type Future =
upgrade::ReadOneThen<TSocket, (), fn(Vec<u8>, ()) -> Result<RPCEvent, DecodeError>>;
fn upgrade_inbound(self, socket: TSocket, _: Self::Info) -> Self::Future {
upgrade::read_one_then(socket, MAX_READ_SIZE, (), |packet, ()| Ok(decode(packet)?))
}
}
fn decode(packet: Vec<u8>) -> Result<RPCEvent, DecodeError> {
// decode the header of the rpc
// request/response
let (request, index) = bool::ssz_decode(&packet, 0)?;
let (id, index) = u64::ssz_decode(&packet, index)?;
let (method_id, index) = u16::ssz_decode(&packet, index)?;
if request {
let body = match RPCMethod::from(method_id) {
RPCMethod::Hello => {
let (hello_body, _index) = HelloMessage::ssz_decode(&packet, index)?;
RPCRequest::Hello(hello_body)
}
RPCMethod::Unknown | _ => return Err(DecodeError::UnknownRPCMethod),
};
Ok(RPCEvent::Request {
id,
method_id,
body,
})
}
// we have received a response
else {
let result = match RPCMethod::from(method_id) {
RPCMethod::Hello => {
let (body, _index) = HelloMessage::ssz_decode(&packet, index)?;
RPCResponse::Hello(body)
}
RPCMethod::Unknown | _ => return Err(DecodeError::UnknownRPCMethod),
};
Ok(RPCEvent::Response {
id,
method_id,
result,
})
}
}
impl<TSocket> OutboundUpgrade<TSocket> for RPCEvent
where
TSocket: AsyncWrite,
{
type Output = ();
type Error = io::Error;
type Future = upgrade::WriteOne<TSocket>;
#[inline]
fn upgrade_outbound(self, socket: TSocket, _: Self::Info) -> Self::Future {
let bytes = ssz_encode(&self);
upgrade::write_one(socket, bytes)
}
}
impl Encodable for RPCEvent {
fn ssz_append(&self, s: &mut SszStream) {
match self {
RPCEvent::Request {
id,
method_id,
body,
} => {
s.append(&true);
s.append(id);
s.append(method_id);
match body {
RPCRequest::Hello(body) => {
s.append(body);
}
_ => {}
}
}
RPCEvent::Response {
id,
method_id,
result,
} => {
s.append(&false);
s.append(id);
s.append(method_id);
match result {
RPCResponse::Hello(response) => {
s.append(response);
}
_ => {}
}
}
}
}
}
#[derive(Debug)]
pub enum DecodeError {
ReadError(upgrade::ReadOneError),
SSZDecodeError(ssz::DecodeError),
UnknownRPCMethod,
}
impl From<upgrade::ReadOneError> for DecodeError {
#[inline]
fn from(err: upgrade::ReadOneError) -> Self {
DecodeError::ReadError(err)
}
}
impl From<ssz::DecodeError> for DecodeError {
#[inline]
fn from(err: ssz::DecodeError) -> Self {
DecodeError::SSZDecodeError(err)
}
}

View File

@@ -0,0 +1,163 @@
use crate::behaviour::{Behaviour, BehaviourEvent};
use crate::error;
use crate::multiaddr::Protocol;
use crate::rpc::RPCEvent;
use crate::NetworkConfig;
use futures::prelude::*;
use futures::Stream;
use libp2p::core::{
muxing::StreamMuxerBox,
nodes::Substream,
transport::boxed::Boxed,
upgrade::{InboundUpgradeExt, OutboundUpgradeExt},
};
use libp2p::{core, secio, Transport};
use libp2p::{PeerId, Swarm};
use slog::{debug, info, trace, warn};
use std::io::{Error, ErrorKind};
use std::time::Duration;
use types::TopicBuilder;
/// The configuration and state of the libp2p components for the beacon node.
pub struct Service {
/// The libp2p Swarm handler.
//TODO: Make this private
pub swarm: Swarm<Boxed<(PeerId, StreamMuxerBox), Error>, Behaviour<Substream<StreamMuxerBox>>>,
/// This node's PeerId.
local_peer_id: PeerId,
/// The libp2p logger handle.
pub log: slog::Logger,
}
impl Service {
pub fn new(config: NetworkConfig, log: slog::Logger) -> error::Result<Self> {
debug!(log, "Libp2p Service starting");
let local_private_key = config.local_private_key;
let local_peer_id = local_private_key.to_peer_id();
info!(log, "Local peer id: {:?}", local_peer_id);
let mut swarm = {
// Set up the transport
let transport = build_transport(local_private_key);
// Set up gossipsub routing
let behaviour = Behaviour::new(local_peer_id.clone(), config.gs_config, &log);
// Set up Topology
let topology = local_peer_id.clone();
Swarm::new(transport, behaviour, topology)
};
// listen on all addresses
for address in &config.listen_addresses {
match Swarm::listen_on(&mut swarm, address.clone()) {
Ok(mut listen_addr) => {
listen_addr.append(Protocol::P2p(local_peer_id.clone().into()));
info!(log, "Listening on: {}", listen_addr);
}
Err(err) => warn!(log, "Cannot listen on: {} : {:?}", address, err),
};
}
// connect to boot nodes - these are currently stored as multiaddrs
// Once we have discovery, can set to peerId
for bootnode in config.boot_nodes {
match Swarm::dial_addr(&mut swarm, bootnode.clone()) {
Ok(()) => debug!(log, "Dialing bootnode: {}", bootnode),
Err(err) => debug!(
log,
"Could not connect to bootnode: {} error: {:?}", bootnode, err
),
};
}
// subscribe to default gossipsub topics
let mut subscribed_topics = vec![];
for topic in config.topics {
let t = TopicBuilder::new(topic.to_string()).build();
if swarm.subscribe(t) {
trace!(log, "Subscribed to topic: {:?}", topic);
subscribed_topics.push(topic);
} else {
warn!(log, "Could not subscribe to topic: {:?}", topic)
}
}
info!(log, "Subscribed to topics: {:?}", subscribed_topics);
Ok(Service {
local_peer_id,
swarm,
log,
})
}
}
impl Stream for Service {
type Item = Libp2pEvent;
type Error = crate::error::Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
loop {
// TODO: Currently only gossipsub events passed here.
// Build a type for more generic events
match self.swarm.poll() {
Ok(Async::Ready(Some(BehaviourEvent::Message(m)))) => {
// TODO: Stub here for debugging
debug!(self.log, "Message received: {}", m);
return Ok(Async::Ready(Some(Libp2pEvent::Message(m))));
}
Ok(Async::Ready(Some(BehaviourEvent::RPC(peer_id, event)))) => {
return Ok(Async::Ready(Some(Libp2pEvent::RPC(peer_id, event))));
}
Ok(Async::Ready(Some(BehaviourEvent::PeerDialed(peer_id)))) => {
return Ok(Async::Ready(Some(Libp2pEvent::PeerDialed(peer_id))));
}
Ok(Async::Ready(None)) => unreachable!("Swarm stream shouldn't end"),
Ok(Async::NotReady) => break,
_ => break,
}
}
Ok(Async::NotReady)
}
}
/// The implementation supports TCP/IP, WebSockets over TCP/IP, secio as the encryption layer, and
/// mplex or yamux as the multiplexing layer.
fn build_transport(
local_private_key: secio::SecioKeyPair,
) -> Boxed<(PeerId, StreamMuxerBox), Error> {
// TODO: The Wire protocol currently doesn't specify encryption and this will need to be customised
// in the future.
let transport = libp2p::tcp::TcpConfig::new();
let transport = libp2p::dns::DnsConfig::new(transport);
#[cfg(feature = "libp2p-websocket")]
let transport = {
let trans_clone = transport.clone();
transport.or_transport(websocket::WsConfig::new(trans_clone))
};
transport
.with_upgrade(secio::SecioConfig::new(local_private_key))
.and_then(move |out, endpoint| {
let peer_id = out.remote_key.into_peer_id();
let peer_id2 = peer_id.clone();
let upgrade = core::upgrade::SelectUpgrade::new(
libp2p::yamux::Config::default(),
libp2p::mplex::MplexConfig::new(),
)
// TODO: use a single `.map` instead of two maps
.map_inbound(move |muxer| (peer_id, muxer))
.map_outbound(move |muxer| (peer_id2, muxer));
core::upgrade::apply(out.stream, upgrade, endpoint)
.map(|(id, muxer)| (id, core::muxing::StreamMuxerBox::new(muxer)))
})
.with_timeout(Duration::from_secs(20))
.map_err(|err| Error::new(ErrorKind::Other, err))
.boxed()
}
/// Events that can be obtained from polling the Libp2p Service.
pub enum Libp2pEvent {
// We have received an RPC event on the swarm
RPC(PeerId, RPCEvent),
PeerDialed(PeerId),
Message(String),
}