Initial core grouping of libp2p behaviours

This commit is contained in:
Age Manning
2019-04-30 15:12:57 +10:00
parent 75959cc9a2
commit f7c2e4c5af
6 changed files with 311 additions and 35 deletions

View File

@@ -3,6 +3,7 @@ use crate::rpc::{RPCEvent, RPCMessage, Rpc};
use crate::NetworkConfig;
use crate::{Topic, TopicHash};
use futures::prelude::*;
use libp2p::Multiaddr;
use libp2p::{
core::{
swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess},
@@ -19,16 +20,18 @@ use slog::{debug, o, trace, warn};
use ssz::{ssz_encode, Decode, DecodeError, Encode};
use std::time::{Duration, Instant};
use tokio_timer::Delay;
use std::collections::HashMap;
use types::{Attestation, BeaconBlock};
/// Builds the network behaviour for the libp2p Swarm.
/// Implements gossipsub message routing.
/// Builds the network behaviour that manages the core protocols of eth2.
/// This core behaviour is managed by `Behaviour` which adds peer management to all core
/// behaviours.
#[derive(NetworkBehaviour)]
#[behaviour(out_event = "BehaviourEvent", poll_method = "poll")]
pub struct Behaviour<TSubstream: AsyncRead + AsyncWrite> {
#[behaviour(out_event = "CoreBehaviourEvent", poll_method = "poll")]
pub struct CoreCoreBehaviourTSubstream: AsyncRead + AsyncWrite> {
/// The routing pub-sub mechanism for eth2.
gossipsub: Gossipsub<TSubstream>,
/// The events generated by this behaviour to be consumed in the swarm poll.
/// The serenity RPC specified in the wire-0 protocol.
serenity_rpc: Rpc<TSubstream>,
/// Allows discovery of IP addresses for peers on the network.
identify: Identify<TSubstream>,
@@ -36,9 +39,9 @@ pub struct Behaviour<TSubstream: AsyncRead + AsyncWrite> {
ping: Ping<TSubstream>,
/// Kademlia for peer discovery.
discovery: Discovery<TSubstream>,
/// Queue of behaviour events to be processed.
#[behaviour(ignore)]
events: Vec<BehaviourEvent>,
/// The events generated by this behaviour to be consumed in the swarm poll.
events: Vec<CoreBehaviourEvent>,
/// Logger for behaviour actions.
#[behaviour(ignore)]
log: slog::Logger,
@@ -46,7 +49,7 @@ pub struct Behaviour<TSubstream: AsyncRead + AsyncWrite> {
// Implement the NetworkBehaviourEventProcess trait so that we can derive NetworkBehaviour for Behaviour
impl<TSubstream: AsyncRead + AsyncWrite> NetworkBehaviourEventProcess<GossipsubEvent>
for Behaviour<TSubstream>
for CoreBehaviourTSubstream>
{
fn inject_event(&mut self, event: GossipsubEvent) {
match event {
@@ -79,7 +82,7 @@ impl<TSubstream: AsyncRead + AsyncWrite> NetworkBehaviourEventProcess<GossipsubE
}
impl<TSubstream: AsyncRead + AsyncWrite> NetworkBehaviourEventProcess<RPCMessage>
for Behaviour<TSubstream>
for CoreBehaviourTSubstream>
{
fn inject_event(&mut self, event: RPCMessage) {
match event {
@@ -94,7 +97,7 @@ impl<TSubstream: AsyncRead + AsyncWrite> NetworkBehaviourEventProcess<RPCMessage
}
impl<TSubstream: AsyncRead + AsyncWrite> NetworkBehaviourEventProcess<IdentifyEvent>
for Behaviour<TSubstream>
for CoreBehaviourTSubstream>
{
fn inject_event(&mut self, event: IdentifyEvent) {
match event {
@@ -124,7 +127,7 @@ impl<TSubstream: AsyncRead + AsyncWrite> NetworkBehaviourEventProcess<IdentifyEv
}
impl<TSubstream: AsyncRead + AsyncWrite> NetworkBehaviourEventProcess<PingEvent>
for Behaviour<TSubstream>
for CoreBehaviourTSubstream>
{
fn inject_event(&mut self, _event: PingEvent) {
// not interested in ping responses at the moment.
@@ -133,14 +136,14 @@ impl<TSubstream: AsyncRead + AsyncWrite> NetworkBehaviourEventProcess<PingEvent>
// implement the discovery behaviour (currently kademlia)
impl<TSubstream: AsyncRead + AsyncWrite> NetworkBehaviourEventProcess<KademliaOut>
for Behaviour<TSubstream>
for CoreBehaviourTSubstream>
{
fn inject_event(&mut self, _out: KademliaOut) {
// not interested in kademlia results at the moment
}
}
impl<TSubstream: AsyncRead + AsyncWrite> Behaviour<TSubstream> {
impl<TSubstream: AsyncRead + AsyncWrite> CoreBehaviourTSubstream> {
pub fn new(local_public_key: PublicKey, net_conf: &NetworkConfig, log: &slog::Logger) -> Self {
let local_peer_id = local_public_key.clone().into_peer_id();
let identify_config = net_conf.identify_config.clone();
@@ -174,17 +177,14 @@ impl<TSubstream: AsyncRead + AsyncWrite> Behaviour<TSubstream> {
}
/// Implements the combined behaviour for the libp2p service.
impl<TSubstream: AsyncRead + AsyncWrite> Behaviour<TSubstream> {
impl<TSubstream: AsyncRead + AsyncWrite> CoreBehaviourTSubstream> {
/* Pubsub behaviour functions */
/// Subscribes to a gossipsub topic.
pub fn subscribe(&mut self, topic: Topic) -> bool {
self.gossipsub.subscribe(topic)
}
/// Sends an RPC Request/Response via the RPC protocol.
pub fn send_rpc(&mut self, peer_id: PeerId, rpc_event: RPCEvent) {
self.serenity_rpc.send_rpc(peer_id, rpc_event);
}
/// Publishes a message on the pubsub (gossipsub) behaviour.
pub fn publish(&mut self, topics: Vec<Topic>, message: PubsubMessage) {
let message_bytes = ssz_encode(&message);
@@ -192,10 +192,18 @@ impl<TSubstream: AsyncRead + AsyncWrite> Behaviour<TSubstream> {
self.gossipsub.publish(topic, message_bytes.clone());
}
}
/* Eth2 RPC behaviour functions */
/// Sends an RPC Request/Response via the RPC protocol.
pub fn send_rpc(&mut self, peer_id: PeerId, rpc_event: RPCEvent) {
self.serenity_rpc.send_rpc(peer_id, rpc_event);
}
}
/// The types of events than can be obtained from polling the behaviour.
pub enum BehaviourEvent {
pub enum CoreBehaviourEvent {
RPC(PeerId, RPCEvent),
PeerDialed(PeerId),
Identified(PeerId, Box<IdentifyInfo>),