Update to lastest libp2p

This commit is contained in:
Age Manning
2019-05-13 17:50:11 +10:00
parent f7c2e4c5af
commit fc8dc6dfa7
4 changed files with 31 additions and 301 deletions

View File

@@ -3,7 +3,6 @@ use crate::rpc::{RPCEvent, RPCMessage, Rpc};
use crate::NetworkConfig;
use crate::{Topic, TopicHash};
use futures::prelude::*;
use libp2p::Multiaddr;
use libp2p::{
core::{
swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess},
@@ -12,23 +11,24 @@ use libp2p::{
gossipsub::{Gossipsub, GossipsubEvent},
identify::{protocol::IdentifyInfo, Identify, IdentifyEvent},
kad::KademliaOut,
ping::{Ping, PingEvent},
ping::{Ping, PingConfig, PingEvent},
tokio_io::{AsyncRead, AsyncWrite},
NetworkBehaviour, PeerId,
};
use slog::{debug, o, trace, warn};
use ssz::{ssz_encode, Decode, DecodeError, Encode};
use std::collections::HashMap;
use std::num::NonZeroU32;
use std::time::{Duration, Instant};
use tokio_timer::Delay;
use std::collections::HashMap;
use types::{Attestation, BeaconBlock};
/// Builds the network behaviour that manages the core protocols of eth2.
/// This core behaviour is managed by `Behaviour` which adds peer management to all core
/// behaviours.
#[derive(NetworkBehaviour)]
#[behaviour(out_event = "CoreBehaviourEvent", poll_method = "poll")]
pub struct CoreCoreBehaviourTSubstream: AsyncRead + AsyncWrite> {
#[behaviour(out_event = "BehaviourEvent", poll_method = "poll")]
pub struct Behaviour<TSubstream: AsyncRead + AsyncWrite> {
/// The routing pub-sub mechanism for eth2.
gossipsub: Gossipsub<TSubstream>,
/// The serenity RPC specified in the wire-0 protocol.
@@ -41,7 +41,7 @@ pub struct CoreCoreBehaviourTSubstream: AsyncRead + AsyncWrite> {
discovery: Discovery<TSubstream>,
#[behaviour(ignore)]
/// The events generated by this behaviour to be consumed in the swarm poll.
events: Vec<CoreBehaviourEvent>,
events: Vec<BehaviourEvent>,
/// Logger for behaviour actions.
#[behaviour(ignore)]
log: slog::Logger,
@@ -49,7 +49,7 @@ pub struct CoreCoreBehaviourTSubstream: AsyncRead + AsyncWrite> {
// Implement the NetworkBehaviourEventProcess trait so that we can derive NetworkBehaviour for Behaviour
impl<TSubstream: AsyncRead + AsyncWrite> NetworkBehaviourEventProcess<GossipsubEvent>
for CoreBehaviourTSubstream>
for Behaviour<TSubstream>
{
fn inject_event(&mut self, event: GossipsubEvent) {
match event {
@@ -82,7 +82,7 @@ impl<TSubstream: AsyncRead + AsyncWrite> NetworkBehaviourEventProcess<GossipsubE
}
impl<TSubstream: AsyncRead + AsyncWrite> NetworkBehaviourEventProcess<RPCMessage>
for CoreBehaviourTSubstream>
for Behaviour<TSubstream>
{
fn inject_event(&mut self, event: RPCMessage) {
match event {
@@ -97,7 +97,7 @@ impl<TSubstream: AsyncRead + AsyncWrite> NetworkBehaviourEventProcess<RPCMessage
}
impl<TSubstream: AsyncRead + AsyncWrite> NetworkBehaviourEventProcess<IdentifyEvent>
for CoreBehaviourTSubstream>
for Behaviour<TSubstream>
{
fn inject_event(&mut self, event: IdentifyEvent) {
match event {
@@ -127,7 +127,7 @@ impl<TSubstream: AsyncRead + AsyncWrite> NetworkBehaviourEventProcess<IdentifyEv
}
impl<TSubstream: AsyncRead + AsyncWrite> NetworkBehaviourEventProcess<PingEvent>
for CoreBehaviourTSubstream>
for Behaviour<TSubstream>
{
fn inject_event(&mut self, _event: PingEvent) {
// not interested in ping responses at the moment.
@@ -136,19 +136,28 @@ impl<TSubstream: AsyncRead + AsyncWrite> NetworkBehaviourEventProcess<PingEvent>
// implement the discovery behaviour (currently kademlia)
impl<TSubstream: AsyncRead + AsyncWrite> NetworkBehaviourEventProcess<KademliaOut>
for CoreBehaviourTSubstream>
for Behaviour<TSubstream>
{
fn inject_event(&mut self, _out: KademliaOut) {
// not interested in kademlia results at the moment
}
}
impl<TSubstream: AsyncRead + AsyncWrite> CoreBehaviourTSubstream> {
impl<TSubstream: AsyncRead + AsyncWrite> Behaviour<TSubstream> {
pub fn new(local_public_key: PublicKey, net_conf: &NetworkConfig, log: &slog::Logger) -> Self {
let local_peer_id = local_public_key.clone().into_peer_id();
let identify_config = net_conf.identify_config.clone();
let behaviour_log = log.new(o!());
// identify configuration
let identify_config = net_conf.identify_config.clone();
// ping configuration
let ping_config = PingConfig::new()
.with_timeout(Duration::from_secs(30))
.with_interval(Duration::from_secs(20))
.with_max_failures(NonZeroU32::new(2).expect("2 != 0"))
.with_keep_alive(false);
Behaviour {
serenity_rpc: Rpc::new(log),
gossipsub: Gossipsub::new(local_peer_id.clone(), net_conf.gs_config.clone()),
@@ -158,7 +167,7 @@ impl<TSubstream: AsyncRead + AsyncWrite> CoreBehaviourTSubstream> {
identify_config.user_agent,
local_public_key,
),
ping: Ping::new(),
ping: Ping::new(ping_config),
events: Vec::new(),
log: behaviour_log,
}
@@ -177,7 +186,7 @@ impl<TSubstream: AsyncRead + AsyncWrite> CoreBehaviourTSubstream> {
}
/// Implements the combined behaviour for the libp2p service.
impl<TSubstream: AsyncRead + AsyncWrite> CoreBehaviourTSubstream> {
impl<TSubstream: AsyncRead + AsyncWrite> Behaviour<TSubstream> {
/* Pubsub behaviour functions */
/// Subscribes to a gossipsub topic.
@@ -199,11 +208,10 @@ impl<TSubstream: AsyncRead + AsyncWrite> CoreBehaviourTSubstream> {
pub fn send_rpc(&mut self, peer_id: PeerId, rpc_event: RPCEvent) {
self.serenity_rpc.send_rpc(peer_id, rpc_event);
}
}
/// The types of events than can be obtained from polling the behaviour.
pub enum CoreBehaviourEvent {
pub enum BehaviourEvent {
RPC(PeerId, RPCEvent),
PeerDialed(PeerId),
Identified(PeerId, Box<IdentifyInfo>),