Update network crate for new libp2p

This commit is contained in:
Age Manning
2020-05-11 15:34:22 +10:00
parent 3ee4c4c60b
commit 1635ae8666
2 changed files with 18 additions and 23 deletions

View File

@@ -23,4 +23,4 @@ pub use libp2p::{multiaddr, Multiaddr};
pub use libp2p::{PeerId, Swarm}; pub use libp2p::{PeerId, Swarm};
pub use peer_manager::{PeerDB, PeerInfo, PeerSyncStatus, SyncInfo}; pub use peer_manager::{PeerDB, PeerInfo, PeerSyncStatus, SyncInfo};
pub use rpc::RPCEvent; pub use rpc::RPCEvent;
pub use service::{Service, NETWORK_KEY_FILENAME}; pub use service::{Libp2pEvent, Service, NETWORK_KEY_FILENAME};

View File

@@ -7,8 +7,8 @@ use crate::{
}; };
use beacon_chain::{BeaconChain, BeaconChainTypes}; use beacon_chain::{BeaconChain, BeaconChainTypes};
use eth2_libp2p::Service as LibP2PService; use eth2_libp2p::Service as LibP2PService;
use eth2_libp2p::{rpc::RPCRequest, BehaviourEvent, Enr, MessageId, NetworkGlobals, PeerId, Swarm}; use eth2_libp2p::{rpc::RPCRequest, BehaviourEvent, Enr, MessageId, NetworkGlobals, PeerId};
use eth2_libp2p::{Multiaddr, PubsubMessage, RPCEvent}; use eth2_libp2p::{Libp2pEvent, PubsubMessage, RPCEvent};
use futures::prelude::*; use futures::prelude::*;
use rest_types::ValidatorSubscription; use rest_types::ValidatorSubscription;
use slog::{debug, error, info, trace}; use slog::{debug, error, info, trace};
@@ -123,8 +123,6 @@ fn spawn_service<T: BeaconChainTypes>(
// spawn on the current executor // spawn on the current executor
tokio::spawn(async move { tokio::spawn(async move {
// indicate if we have updated the listening addresses
let mut setup_listener = false;
loop { loop {
// build the futures to check simultaneously // build the futures to check simultaneously
tokio::select! { tokio::select! {
@@ -251,7 +249,8 @@ fn spawn_service<T: BeaconChainTypes>(
Some(libp2p_event) = service.libp2p.next() => { Some(libp2p_event) = service.libp2p.next() => {
// poll the swarm // poll the swarm
match libp2p_event { match libp2p_event {
Ok(BehaviourEvent::RPC(peer_id, rpc_event)) => { Libp2pEvent::Behaviour(event) => match event {
BehaviourEvent::RPC(peer_id, rpc_event) => {
// if we received a Goodbye message, drop and ban the peer // if we received a Goodbye message, drop and ban the peer
if let RPCEvent::Request(_, RPCRequest::Goodbye(_)) = rpc_event { if let RPCEvent::Request(_, RPCRequest::Goodbye(_)) = rpc_event {
//peers_to_ban.push(peer_id.clone()); //peers_to_ban.push(peer_id.clone());
@@ -267,7 +266,7 @@ fn spawn_service<T: BeaconChainTypes>(
debug!(service.log, "Failed to send RPC to router"); debug!(service.log, "Failed to send RPC to router");
}); });
} }
Ok(BehaviourEvent::PeerDialed(peer_id)) => { BehaviourEvent::PeerDialed(peer_id) => {
debug!(service.log, "Peer Dialed"; "peer_id" => format!("{}", peer_id)); debug!(service.log, "Peer Dialed"; "peer_id" => format!("{}", peer_id));
let _ = service let _ = service
.router_send .router_send
@@ -275,7 +274,7 @@ fn spawn_service<T: BeaconChainTypes>(
.map_err(|_| { .map_err(|_| {
debug!(service.log, "Failed to send peer dialed to router"); }); debug!(service.log, "Failed to send peer dialed to router"); });
} }
Ok(BehaviourEvent::PeerDisconnected(peer_id)) => { BehaviourEvent::PeerDisconnected(peer_id) => {
debug!(service.log, "Peer Disconnected"; "peer_id" => format!("{}", peer_id)); debug!(service.log, "Peer Disconnected"; "peer_id" => format!("{}", peer_id));
let _ = service let _ = service
.router_send .router_send
@@ -284,7 +283,7 @@ fn spawn_service<T: BeaconChainTypes>(
debug!(service.log, "Failed to send peer disconnect to router"); debug!(service.log, "Failed to send peer disconnect to router");
}); });
} }
Ok(BehaviourEvent::StatusPeer(peer_id)) => { BehaviourEvent::StatusPeer(peer_id) => {
let _ = service let _ = service
.router_send .router_send
.send(RouterMessage::StatusPeer(peer_id)) .send(RouterMessage::StatusPeer(peer_id))
@@ -292,12 +291,12 @@ fn spawn_service<T: BeaconChainTypes>(
debug!(service.log, "Failed to send re-status peer to router"); debug!(service.log, "Failed to send re-status peer to router");
}); });
} }
Ok(BehaviourEvent::PubsubMessage { BehaviourEvent::PubsubMessage {
id, id,
source, source,
message, message,
.. ..
}) => { } => {
match message { match message {
// attestation information gets processed in the attestation service // attestation information gets processed in the attestation service
PubsubMessage::Attestation(ref subnet_and_attestation) => { PubsubMessage::Attestation(ref subnet_and_attestation) => {
@@ -330,8 +329,14 @@ fn spawn_service<T: BeaconChainTypes>(
} }
} }
} }
Ok(BehaviourEvent::PeerSubscribed(_, _)) => {}, BehaviourEvent::PeerSubscribed(_, _) => {},
Err(_) => {} // already logged }
Libp2pEvent::NewListenAddr(multiaddr) => {
service.network_globals.listen_multiaddrs.write().push(multiaddr);
}
Libp2pEvent::ConnectionEstablished{ peer_id, endpoint: _, num_established: _ } => {
debug!(service.log, "Connection established"; "peer_id" => peer_id.to_string());
}
} }
} }
// if there is a fork update // if there is a fork update
@@ -343,16 +348,6 @@ fn spawn_service<T: BeaconChainTypes>(
service.next_fork_update = next_fork_delay(&service.beacon_chain); service.next_fork_update = next_fork_delay(&service.beacon_chain);
} }
} }
// updates the listening addresses in network globals if it has not already been
// updated
if !setup_listener {
let multi_addrs: Vec<Multiaddr> =
Swarm::listeners(&service.libp2p.swarm).cloned().collect();
if !multi_addrs.is_empty() {
*service.network_globals.listen_multiaddrs.write() = multi_addrs;
setup_listener = true
}
}
} }
}); });