From 1635ae8666c92ea5bc1bd8604457dc86e2d9df3d Mon Sep 17 00:00:00 2001 From: Age Manning Date: Mon, 11 May 2020 15:34:22 +1000 Subject: [PATCH] Update network crate for new libp2p --- beacon_node/eth2-libp2p/src/lib.rs | 2 +- beacon_node/network/src/service.rs | 39 +++++++++++++----------------- 2 files changed, 18 insertions(+), 23 deletions(-) diff --git a/beacon_node/eth2-libp2p/src/lib.rs b/beacon_node/eth2-libp2p/src/lib.rs index 839986e5a0..120fb6b92f 100644 --- a/beacon_node/eth2-libp2p/src/lib.rs +++ b/beacon_node/eth2-libp2p/src/lib.rs @@ -23,4 +23,4 @@ pub use libp2p::{multiaddr, Multiaddr}; pub use libp2p::{PeerId, Swarm}; pub use peer_manager::{PeerDB, PeerInfo, PeerSyncStatus, SyncInfo}; pub use rpc::RPCEvent; -pub use service::{Service, NETWORK_KEY_FILENAME}; +pub use service::{Libp2pEvent, Service, NETWORK_KEY_FILENAME}; diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index e50b6585f3..02e7da2bd5 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -7,8 +7,8 @@ use crate::{ }; use beacon_chain::{BeaconChain, BeaconChainTypes}; use eth2_libp2p::Service as LibP2PService; -use eth2_libp2p::{rpc::RPCRequest, BehaviourEvent, Enr, MessageId, NetworkGlobals, PeerId, Swarm}; -use eth2_libp2p::{Multiaddr, PubsubMessage, RPCEvent}; +use eth2_libp2p::{rpc::RPCRequest, BehaviourEvent, Enr, MessageId, NetworkGlobals, PeerId}; +use eth2_libp2p::{Libp2pEvent, PubsubMessage, RPCEvent}; use futures::prelude::*; use rest_types::ValidatorSubscription; use slog::{debug, error, info, trace}; @@ -123,8 +123,6 @@ fn spawn_service( // spawn on the current executor tokio::spawn(async move { - // indicate if we have updated the listening addresses - let mut setup_listener = false; loop { // build the futures to check simultaneously tokio::select! { @@ -251,7 +249,8 @@ fn spawn_service( Some(libp2p_event) = service.libp2p.next() => { // poll the swarm match libp2p_event { - Ok(BehaviourEvent::RPC(peer_id, rpc_event)) => { + Libp2pEvent::Behaviour(event) => match event { + BehaviourEvent::RPC(peer_id, rpc_event) => { // if we received a Goodbye message, drop and ban the peer if let RPCEvent::Request(_, RPCRequest::Goodbye(_)) = rpc_event { //peers_to_ban.push(peer_id.clone()); @@ -267,7 +266,7 @@ fn spawn_service( debug!(service.log, "Failed to send RPC to router"); }); } - Ok(BehaviourEvent::PeerDialed(peer_id)) => { + BehaviourEvent::PeerDialed(peer_id) => { debug!(service.log, "Peer Dialed"; "peer_id" => format!("{}", peer_id)); let _ = service .router_send @@ -275,7 +274,7 @@ fn spawn_service( .map_err(|_| { debug!(service.log, "Failed to send peer dialed to router"); }); } - Ok(BehaviourEvent::PeerDisconnected(peer_id)) => { + BehaviourEvent::PeerDisconnected(peer_id) => { debug!(service.log, "Peer Disconnected"; "peer_id" => format!("{}", peer_id)); let _ = service .router_send @@ -284,7 +283,7 @@ fn spawn_service( debug!(service.log, "Failed to send peer disconnect to router"); }); } - Ok(BehaviourEvent::StatusPeer(peer_id)) => { + BehaviourEvent::StatusPeer(peer_id) => { let _ = service .router_send .send(RouterMessage::StatusPeer(peer_id)) @@ -292,12 +291,12 @@ fn spawn_service( debug!(service.log, "Failed to send re-status peer to router"); }); } - Ok(BehaviourEvent::PubsubMessage { + BehaviourEvent::PubsubMessage { id, source, message, .. - }) => { + } => { match message { // attestation information gets processed in the attestation service PubsubMessage::Attestation(ref subnet_and_attestation) => { @@ -330,10 +329,16 @@ fn spawn_service( } } } - Ok(BehaviourEvent::PeerSubscribed(_, _)) => {}, - Err(_) => {} // already logged + BehaviourEvent::PeerSubscribed(_, _) => {}, } + Libp2pEvent::NewListenAddr(multiaddr) => { + service.network_globals.listen_multiaddrs.write().push(multiaddr); } + Libp2pEvent::ConnectionEstablished{ peer_id, endpoint: _, num_established: _ } => { + debug!(service.log, "Connection established"; "peer_id" => peer_id.to_string()); + } + } + } // if there is a fork update _ = service.next_fork_update.take().unwrap(), if service.next_fork_update.is_some() => { service @@ -343,16 +348,6 @@ fn spawn_service( service.next_fork_update = next_fork_delay(&service.beacon_chain); } } - // updates the listening addresses in network globals if it has not already been - // updated - if !setup_listener { - let multi_addrs: Vec = - Swarm::listeners(&service.libp2p.swarm).cloned().collect(); - if !multi_addrs.is_empty() { - *service.network_globals.listen_multiaddrs.write() = multi_addrs; - setup_listener = true - } - } } });