diff --git a/beacon_node/network/src/attestation_service/mod.rs b/beacon_node/network/src/attestation_service/mod.rs index c44ec204ac..b71e01f413 100644 --- a/beacon_node/network/src/attestation_service/mod.rs +++ b/beacon_node/network/src/attestation_service/mod.rs @@ -5,13 +5,15 @@ use beacon_chain::{BeaconChain, BeaconChainTypes}; use eth2_libp2p::{types::GossipKind, MessageId, NetworkGlobals, PeerId}; use futures::prelude::*; -use hashmap_delay::HashSetDelay; +use hashset_delay::HashSetDelay; use rand::seq::SliceRandom; use rest_types::ValidatorSubscription; use slog::{crit, debug, error, o, warn}; use slot_clock::SlotClock; use std::collections::VecDeque; +use std::pin::Pin; use std::sync::Arc; +use std::task::{Context, Poll}; use std::time::{Duration, Instant}; use types::{Attestation, EthSpec, Slot, SubnetId}; @@ -609,57 +611,64 @@ impl AttestationService { impl Stream for AttestationService { type Item = AttServiceMessage; - type Error = (); - fn poll(&mut self) -> Poll, Self::Error> { + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { // process any peer discovery events - while let Async::Ready(Some(exact_subnet)) = - self.discover_peers.poll().map_err(|e| { - error!(self.log, "Failed to check for peer discovery requests"; "error"=> format!("{}", e)); - })? - { - self.handle_discover_peers(exact_subnet); - } + match self.discover_peers.poll_next_unpin(cx) { + Poll::Ready(Some(Ok(exact_subnet))) => self.handle_discover_peers(exact_subnet), + Poll::Ready(Some(Err(e))) => { + error!(self.log, "Failed to check for peer discovery requests"; "error"=> format!("{}", e)); + } + Poll::Ready(None) | Poll::Pending => {} + } // process any subscription events - while let Async::Ready(Some(exact_subnet)) = self.subscriptions.poll().map_err(|e| { - error!(self.log, "Failed to check for subnet subscription times"; "error"=> format!("{}", e)); - })? - { - self.handle_subscriptions(exact_subnet); - } + match self.subscriptions.poll_next_unpin(cx) { + Poll::Ready(Some(Ok(exact_subnet))) => self.handle_subscriptions(exact_subnet), + Poll::Ready(Some(Err(e))) => { + error!(self.log, "Failed to check for subnet subscription times"; "error"=> format!("{}", e)); + } + Poll::Ready(None) | Poll::Pending => {} + } // process any un-subscription events - while let Async::Ready(Some(exact_subnet)) = self.unsubscriptions.poll().map_err(|e| { - error!(self.log, "Failed to check for subnet unsubscription times"; "error"=> format!("{}", e)); - })? - { - self.handle_unsubscriptions(exact_subnet); - } + match self.unsubscriptions.poll_next_unpin(cx) { + Poll::Ready(Some(Ok(exact_subnet))) => self.handle_unsubscriptions(exact_subnet), + Poll::Ready(Some(Err(e))) => { + error!(self.log, "Failed to check for subnet unsubscription times"; "error"=> format!("{}", e)); + } + Poll::Ready(None) | Poll::Pending => {} + } // process any random subnet expiries - while let Async::Ready(Some(subnet)) = self.random_subnets.poll().map_err(|e| { - error!(self.log, "Failed to check for random subnet cycles"; "error"=> format!("{}", e)); - })? - { - self.handle_random_subnet_expiry(subnet); - } + match self.random_subnets.poll_next_unpin(cx) { + Poll::Ready(Some(Ok(subnet))) => self.handle_random_subnet_expiry(subnet), + Poll::Ready(Some(Err(e))) => { + error!(self.log, "Failed to check for random subnet cycles"; "error"=> format!("{}", e)); + } + Poll::Ready(None) | Poll::Pending => {} + } // process any known validator expiries - while let Async::Ready(Some(_validator_index)) = self.known_validators.poll().map_err(|e| { - error!(self.log, "Failed to check for random subnet cycles"; "error"=> format!("{}", e)); - })? - { - let _ = self.handle_known_validator_expiry(); - } + match self.known_validators.poll_next_unpin(cx) { + Poll::Ready(Some(Ok(_validator_index))) => { + let _ = self.handle_known_validator_expiry(); + } + Poll::Ready(Some(Err(e))) => { + error!(self.log, "Failed to check for random subnet cycles"; "error"=> format!("{}", e)); + } + Poll::Ready(None) | Poll::Pending => {} + } // poll to remove entries on expiration, no need to act on expiration events - let _ = self.aggregate_validators_on_subnet.poll().map_err(|e| { error!(self.log, "Failed to check for aggregate validator on subnet expirations"; "error"=> format!("{}", e)); }); + if let Poll::Ready(Some(Err(e))) = self.aggregate_validators_on_subnet.poll_next_unpin(cx) { + error!(self.log, "Failed to check for aggregate validator on subnet expirations"; "error"=> format!("{}", e)); + } // process any generated events if let Some(event) = self.events.pop_front() { - return Ok(Async::Ready(Some(event))); + return Poll::Ready(Some(event)); } - Ok(Async::NotReady) + Poll::Pending } } diff --git a/beacon_node/network/src/router/mod.rs b/beacon_node/network/src/router/mod.rs index d6a222e20c..c3e9e65e35 100644 --- a/beacon_node/network/src/router/mod.rs +++ b/beacon_node/network/src/router/mod.rs @@ -16,8 +16,7 @@ use eth2_libp2p::{ }, MessageId, NetworkGlobals, PeerId, PubsubMessage, RPCEvent, }; -use futures::future::Future; -use futures::stream::Stream; +use futures::prelude::*; use processor::Processor; use slog::{debug, o, trace, warn}; use std::sync::Arc; @@ -60,7 +59,7 @@ impl Router { beacon_chain: Arc>, network_globals: Arc>, network_send: mpsc::UnboundedSender>, - executor: &tokio::runtime::TaskExecutor, + runtime_handle: &tokio::runtime::Handle, log: slog::Logger, ) -> error::Result>> { let message_handler_log = log.new(o!("service"=> "router")); @@ -70,7 +69,7 @@ impl Router { // Initialise a message instance, which itself spawns the syncing thread. let processor = Processor::new( - executor, + runtime_handle, beacon_chain, network_globals, network_send.clone(), @@ -85,13 +84,12 @@ impl Router { }; // spawn handler task and move the message handler instance into the spawned thread - executor.spawn( + runtime_handle.spawn(async move { handler_recv - .for_each(move |msg| Ok(handler.handle_message(msg))) - .map_err(move |_| { - debug!(log, "Network message handler terminated."); - }), - ); + .for_each(move |msg| future::ready(handler.handle_message(msg))) + .await; + debug!(log, "Network message handler terminated."); + }); Ok(handler_send) } @@ -313,7 +311,7 @@ impl Router { /// Informs the network service that the message should be forwarded to other peers. fn propagate_message(&mut self, message_id: MessageId, propagation_source: PeerId) { self.network_send - .try_send(NetworkMessage::Propagate { + .send(NetworkMessage::Propagate { propagation_source, message_id, }) diff --git a/beacon_node/network/src/router/processor.rs b/beacon_node/network/src/router/processor.rs index 79e4e1bcb7..22adcf27ef 100644 --- a/beacon_node/network/src/router/processor.rs +++ b/beacon_node/network/src/router/processor.rs @@ -44,7 +44,7 @@ pub struct Processor { impl Processor { /// Instantiate a `Processor` instance pub fn new( - executor: &tokio::runtime::TaskExecutor, + runtime_handle: &tokio::runtime::Handle, beacon_chain: Arc>, network_globals: Arc>, network_send: mpsc::UnboundedSender>, @@ -54,7 +54,7 @@ impl Processor { // spawn the sync thread let (sync_send, _sync_exit) = crate::sync::manager::spawn( - executor, + runtime_handle, beacon_chain.clone(), network_globals, network_send.clone(), @@ -71,7 +71,7 @@ impl Processor { } fn send_to_sync(&mut self, message: SyncMessage) { - self.sync_send.try_send(message).unwrap_or_else(|_| { + self.sync_send.send(message).unwrap_or_else(|_| { warn!( self.log, "Could not send message to the sync service"; @@ -914,7 +914,7 @@ impl HandlerNetworkContext { ); self.send_rpc_request(peer_id.clone(), RPCRequest::Goodbye(reason)); self.network_send - .try_send(NetworkMessage::Disconnect { peer_id }) + .send(NetworkMessage::Disconnect { peer_id }) .unwrap_or_else(|_| { warn!( self.log, @@ -955,7 +955,7 @@ impl HandlerNetworkContext { fn send_rpc_event(&mut self, peer_id: PeerId, rpc_event: RPCEvent) { self.network_send - .try_send(NetworkMessage::RPC(peer_id, rpc_event)) + .send(NetworkMessage::RPC(peer_id, rpc_event)) .unwrap_or_else(|_| { warn!( self.log, diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index 43db6c50fc..e50b6585f3 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -8,16 +8,15 @@ use crate::{ use beacon_chain::{BeaconChain, BeaconChainTypes}; use eth2_libp2p::Service as LibP2PService; use eth2_libp2p::{rpc::RPCRequest, BehaviourEvent, Enr, MessageId, NetworkGlobals, PeerId, Swarm}; -use eth2_libp2p::{PubsubMessage, RPCEvent}; +use eth2_libp2p::{Multiaddr, PubsubMessage, RPCEvent}; use futures::prelude::*; -use futures::Stream; use rest_types::ValidatorSubscription; use slog::{debug, error, info, trace}; use std::sync::Arc; -use std::time::{Duration, Instant}; +use std::time::Duration; use tokio::runtime::Handle; use tokio::sync::{mpsc, oneshot}; -use tokio::time::{delay_for, Delay}; +use tokio::time::Delay; use types::EthSpec; mod tests; @@ -42,8 +41,6 @@ pub struct NetworkService { store: Arc, /// A collection of global variables, accessible outside of the network service. network_globals: Arc>, - /// An initial delay to update variables after the libp2p service has started. - initial_delay: Delay, /// A delay that expires when a new fork takes place. next_fork_update: Option, /// The logger for the network service. @@ -84,10 +81,6 @@ impl NetworkService { libp2p.swarm.add_enr(enr); } - // A delay used to initialise code after the network has started - // This is currently used to obtain the listening addresses from the libp2p service. - let initial_delay = Delay::new(Instant::now() + Duration::from_secs(1)); - // launch derived network services // router task @@ -112,7 +105,6 @@ impl NetworkService { router_send, store, network_globals: network_globals.clone(), - initial_delay, next_fork_update, log: network_log, propagation_percentage, @@ -130,268 +122,239 @@ fn spawn_service( let (network_exit, mut exit_rx) = tokio::sync::oneshot::channel(); // spawn on the current executor - tokio::spawn(futures::future::poll_fn(move || -> Result<_, ()> { - let log = &service.log; - - // handles any logic which requires an initial delay - if !service.initial_delay.is_elapsed() { - if let Ok(Async::Ready(_)) = service.initial_delay.poll() { - let multi_addrs = Swarm::listeners(&service.libp2p.swarm).cloned().collect(); - *service.network_globals.listen_multiaddrs.write() = multi_addrs; - } - } - - // perform termination tasks when the network is being shutdown - if let Ok(Async::Ready(_)) | Err(_) = exit_rx.poll() { - // network thread is terminating - let enrs: Vec = service.libp2p.swarm.enr_entries().cloned().collect(); - debug!( - log, - "Persisting DHT to store"; - "Number of peers" => format!("{}", enrs.len()), - ); - - match persist_dht::(service.store.clone(), enrs) { - Err(e) => error!( - log, - "Failed to persist DHT on drop"; - "error" => format!("{:?}", e) - ), - Ok(_) => info!( - log, - "Saved DHT state"; - ), - } - - info!(log.clone(), "Network service shutdown"); - return Ok(Async::Ready(())); - } - - // processes the network channel before processing the libp2p swarm + tokio::spawn(async move { + // indicate if we have updated the listening addresses + let mut setup_listener = false; loop { - // poll the network channel - match service.network_recv.poll() { - Ok(Async::Ready(Some(message))) => match message { - NetworkMessage::RPC(peer_id, rpc_event) => { - trace!(log, "Sending RPC"; "rpc" => format!("{}", rpc_event)); - service.libp2p.swarm.send_rpc(peer_id, rpc_event); + // build the futures to check simultaneously + tokio::select! { + // handle network shutdown + _ = (&mut exit_rx) => { + // network thread is terminating + let enrs: Vec = service.libp2p.swarm.enr_entries().cloned().collect(); + debug!( + service.log, + "Persisting DHT to store"; + "Number of peers" => format!("{}", enrs.len()), + ); + + match persist_dht::(service.store.clone(), enrs) { + Err(e) => error!( + service.log, + "Failed to persist DHT on drop"; + "error" => format!("{:?}", e) + ), + Ok(_) => info!( + service.log, + "Saved DHT state"; + ), } - NetworkMessage::Propagate { - propagation_source, - message_id, - } => { - // TODO: Remove this for mainnet - // randomly prevents propagation - let mut should_send = true; - if let Some(percentage) = service.propagation_percentage { - // not exact percentage but close enough - let rand = rand::random::() % 100; - if rand > percentage { - // don't propagate - should_send = false; - } + + info!(service.log, "Network service shutdown"); + return; + } + // handle a message sent to the network + Some(message) = service.network_recv.recv() => { + match message { + NetworkMessage::RPC(peer_id, rpc_event) => { + trace!(service.log, "Sending RPC"; "rpc" => format!("{}", rpc_event)); + service.libp2p.swarm.send_rpc(peer_id, rpc_event); } - if !should_send { - info!(log, "Random filter did not propagate message"); - } else { - trace!(log, "Propagating gossipsub message"; - "propagation_peer" => format!("{:?}", propagation_source), - "message_id" => message_id.to_string(), - ); - service - .libp2p - .swarm - .propagate_message(&propagation_source, message_id); - } - } - NetworkMessage::Publish { messages } => { - // TODO: Remove this for mainnet - // randomly prevents propagation - let mut should_send = true; - if let Some(percentage) = service.propagation_percentage { - // not exact percentage but close enough - let rand = rand::random::() % 100; - if rand > percentage { - // don't propagate - should_send = false; - } - } - if !should_send { - info!(log, "Random filter did not publish messages"); - } else { - let mut topic_kinds = Vec::new(); - for message in &messages { - if !topic_kinds.contains(&message.kind()) { - topic_kinds.push(message.kind()); + NetworkMessage::Propagate { + propagation_source, + message_id, + } => { + // TODO: Remove this for mainnet + // randomly prevents propagation + let mut should_send = true; + if let Some(percentage) = service.propagation_percentage { + // not exact percentage but close enough + let rand = rand::random::() % 100; + if rand > percentage { + // don't propagate + should_send = false; } } - debug!(log, "Sending pubsub messages"; "count" => messages.len(), "topics" => format!("{:?}", topic_kinds)); - service.libp2p.swarm.publish(messages); - } - } - NetworkMessage::Disconnect { peer_id } => { - service.libp2p.disconnect_and_ban_peer( - peer_id, - std::time::Duration::from_secs(BAN_PEER_TIMEOUT), - ); - } - NetworkMessage::Subscribe { subscriptions } => { - // the result is dropped as it used solely for ergonomics - let _ = service - .attestation_service - .validator_subscriptions(subscriptions); - } - }, - Ok(Async::NotReady) => break, - Ok(Async::Ready(None)) => { - debug!(log, "Network channel closed"); - return Err(()); - } - Err(e) => { - debug!(log, "Network channel error"; "error" => format!("{}", e)); - return Err(()); - } - } - } - - // process any attestation service events - // NOTE: This must come after the network message processing as that may trigger events in - // the attestation service. - while let Ok(Async::Ready(Some(attestation_service_message))) = - service.attestation_service.poll() - { - match attestation_service_message { - // TODO: Implement - AttServiceMessage::Subscribe(subnet_id) => { - service.libp2p.swarm.subscribe_to_subnet(subnet_id); - } - AttServiceMessage::Unsubscribe(subnet_id) => { - service.libp2p.swarm.subscribe_to_subnet(subnet_id); - } - AttServiceMessage::EnrAdd(subnet_id) => { - service.libp2p.swarm.update_enr_subnet(subnet_id, true); - } - AttServiceMessage::EnrRemove(subnet_id) => { - service.libp2p.swarm.update_enr_subnet(subnet_id, false); - } - AttServiceMessage::DiscoverPeers(subnet_id) => { - service.libp2p.swarm.peers_request(subnet_id); - } - } - } - - let mut peers_to_ban = Vec::new(); - // poll the swarm - loop { - match service.libp2p.poll() { - Ok(Async::Ready(Some(event))) => match event { - BehaviourEvent::RPC(peer_id, rpc_event) => { - // if we received a Goodbye message, drop and ban the peer - if let RPCEvent::Request(_, RPCRequest::Goodbye(_)) = rpc_event { - peers_to_ban.push(peer_id.clone()); - }; - service - .router_send - .try_send(RouterMessage::RPC(peer_id, rpc_event)) - .map_err(|_| { - debug!(log, "Failed to send RPC to router"); - })?; - } - BehaviourEvent::PeerDialed(peer_id) => { - debug!(log, "Peer Dialed"; "peer_id" => format!("{}", peer_id)); - service - .router_send - .try_send(RouterMessage::PeerDialed(peer_id)) - .map_err(|_| { - debug!(log, "Failed to send peer dialed to router"); - })?; - } - BehaviourEvent::PeerDisconnected(peer_id) => { - debug!(log, "Peer Disconnected"; "peer_id" => format!("{}", peer_id)); - service - .router_send - .try_send(RouterMessage::PeerDisconnected(peer_id)) - .map_err(|_| { - debug!(log, "Failed to send peer disconnect to router"); - })?; - } - BehaviourEvent::StatusPeer(peer_id) => { - service - .router_send - .try_send(RouterMessage::StatusPeer(peer_id)) - .map_err(|_| { - debug!(log, "Failed to send re-status peer to router"); - })?; - } - BehaviourEvent::PubsubMessage { - id, - source, - message, - .. - } => { - match message { - // attestation information gets processed in the attestation service - PubsubMessage::Attestation(ref subnet_and_attestation) => { - let subnet = &subnet_and_attestation.0; - let attestation = &subnet_and_attestation.1; - // checks if we have an aggregator for the slot. If so, we process - // the attestation - if service.attestation_service.should_process_attestation( - &id, - &source, - subnet, - attestation, - ) { - service - .router_send - .try_send(RouterMessage::PubsubMessage(id, source, message)) - .map_err(|_| { - debug!(log, "Failed to send pubsub message to router"); - })?; - } - } - _ => { - // all else is sent to the router + if !should_send { + info!(service.log, "Random filter did not propagate message"); + } else { + trace!(service.log, "Propagating gossipsub message"; + "propagation_peer" => format!("{:?}", propagation_source), + "message_id" => message_id.to_string(), + ); service - .router_send - .try_send(RouterMessage::PubsubMessage(id, source, message)) - .map_err(|_| { - debug!(log, "Failed to send pubsub message to router"); - })?; + .libp2p + .swarm + .propagate_message(&propagation_source, message_id); } } - } - BehaviourEvent::PeerSubscribed(_, _) => {} - }, - Ok(Async::Ready(None)) => unreachable!("Stream never ends"), - Ok(Async::NotReady) => break, - Err(_) => break, + NetworkMessage::Publish { messages } => { + // TODO: Remove this for mainnet + // randomly prevents propagation + let mut should_send = true; + if let Some(percentage) = service.propagation_percentage { + // not exact percentage but close enough + let rand = rand::random::() % 100; + if rand > percentage { + // don't propagate + should_send = false; + } + } + if !should_send { + info!(service.log, "Random filter did not publish messages"); + } else { + let mut topic_kinds = Vec::new(); + for message in &messages { + if !topic_kinds.contains(&message.kind()) { + topic_kinds.push(message.kind()); + } + } + debug!(service.log, "Sending pubsub messages"; "count" => messages.len(), "topics" => format!("{:?}", topic_kinds)); + service.libp2p.swarm.publish(messages); + } + } + NetworkMessage::Disconnect { peer_id } => { + service.libp2p.disconnect_and_ban_peer( + peer_id, + std::time::Duration::from_secs(BAN_PEER_TIMEOUT), + ); + } + NetworkMessage::Subscribe { subscriptions } => { + // the result is dropped as it used solely for ergonomics + let _ = service + .attestation_service + .validator_subscriptions(subscriptions); + } + } } - } - - // ban and disconnect any peers that sent Goodbye requests - while let Some(peer_id) = peers_to_ban.pop() { - service.libp2p.disconnect_and_ban_peer( - peer_id.clone(), - std::time::Duration::from_secs(BAN_PEER_TIMEOUT), - ); - } - - // if we have just forked, update inform the libp2p layer - if let Some(mut update_fork_delay) = service.next_fork_update.take() { - if !update_fork_delay.is_elapsed() { - if let Ok(Async::Ready(_)) = update_fork_delay.poll() { - service - .libp2p - .swarm - .update_fork_version(service.beacon_chain.enr_fork_id()); - service.next_fork_update = next_fork_delay(&service.beacon_chain); + // process any attestation service events + Some(attestation_service_message) = service.attestation_service.next() => { + match attestation_service_message { + // TODO: Implement + AttServiceMessage::Subscribe(subnet_id) => { + service.libp2p.swarm.subscribe_to_subnet(subnet_id); + } + AttServiceMessage::Unsubscribe(subnet_id) => { + service.libp2p.swarm.subscribe_to_subnet(subnet_id); + } + AttServiceMessage::EnrAdd(subnet_id) => { + service.libp2p.swarm.update_enr_subnet(subnet_id, true); + } + AttServiceMessage::EnrRemove(subnet_id) => { + service.libp2p.swarm.update_enr_subnet(subnet_id, false); + } + AttServiceMessage::DiscoverPeers(subnet_id) => { + service.libp2p.swarm.peers_request(subnet_id); + } + } + } + Some(libp2p_event) = service.libp2p.next() => { + // poll the swarm + match libp2p_event { + Ok(BehaviourEvent::RPC(peer_id, rpc_event)) => { + // if we received a Goodbye message, drop and ban the peer + if let RPCEvent::Request(_, RPCRequest::Goodbye(_)) = rpc_event { + //peers_to_ban.push(peer_id.clone()); + service.libp2p.disconnect_and_ban_peer( + peer_id.clone(), + std::time::Duration::from_secs(BAN_PEER_TIMEOUT), + ); + }; + let _ = service + .router_send + .send(RouterMessage::RPC(peer_id, rpc_event)) + .map_err(|_| { + debug!(service.log, "Failed to send RPC to router"); + }); + } + Ok(BehaviourEvent::PeerDialed(peer_id)) => { + debug!(service.log, "Peer Dialed"; "peer_id" => format!("{}", peer_id)); + let _ = service + .router_send + .send(RouterMessage::PeerDialed(peer_id)) + .map_err(|_| { + debug!(service.log, "Failed to send peer dialed to router"); }); + } + Ok(BehaviourEvent::PeerDisconnected(peer_id)) => { + debug!(service.log, "Peer Disconnected"; "peer_id" => format!("{}", peer_id)); + let _ = service + .router_send + .send(RouterMessage::PeerDisconnected(peer_id)) + .map_err(|_| { + debug!(service.log, "Failed to send peer disconnect to router"); + }); + } + Ok(BehaviourEvent::StatusPeer(peer_id)) => { + let _ = service + .router_send + .send(RouterMessage::StatusPeer(peer_id)) + .map_err(|_| { + debug!(service.log, "Failed to send re-status peer to router"); + }); + } + Ok(BehaviourEvent::PubsubMessage { + id, + source, + message, + .. + }) => { + match message { + // attestation information gets processed in the attestation service + PubsubMessage::Attestation(ref subnet_and_attestation) => { + let subnet = &subnet_and_attestation.0; + let attestation = &subnet_and_attestation.1; + // checks if we have an aggregator for the slot. If so, we process + // the attestation + if service.attestation_service.should_process_attestation( + &id, + &source, + subnet, + attestation, + ) { + let _ = service + .router_send + .send(RouterMessage::PubsubMessage(id, source, message)) + .map_err(|_| { + debug!(service.log, "Failed to send pubsub message to router"); + }); + } + } + _ => { + // all else is sent to the router + let _ = service + .router_send + .send(RouterMessage::PubsubMessage(id, source, message)) + .map_err(|_| { + debug!(service.log, "Failed to send pubsub message to router"); + }); + } + } + } + Ok(BehaviourEvent::PeerSubscribed(_, _)) => {}, + Err(_) => {} // already logged + } + } + // if there is a fork update + _ = service.next_fork_update.take().unwrap(), if service.next_fork_update.is_some() => { + service + .libp2p + .swarm + .update_fork_version(service.beacon_chain.enr_fork_id()); + service.next_fork_update = next_fork_delay(&service.beacon_chain); + } + } + // updates the listening addresses in network globals if it has not already been + // updated + if !setup_listener { + let multi_addrs: Vec = + Swarm::listeners(&service.libp2p.swarm).cloned().collect(); + if !multi_addrs.is_empty() { + *service.network_globals.listen_multiaddrs.write() = multi_addrs; + setup_listener = true } } } - - Ok(Async::NotReady) - })); + }); Ok(network_exit) } @@ -400,11 +363,11 @@ fn spawn_service( /// If there is no scheduled fork, `None` is returned. fn next_fork_delay( beacon_chain: &BeaconChain, -) -> Option { +) -> Option { beacon_chain.duration_to_next_fork().map(|until_fork| { // Add a short time-out to start within the new fork period. let delay = Duration::from_millis(200); - tokio::timer::Delay::new(Instant::now() + until_fork + delay) + tokio::time::delay_until(tokio::time::Instant::now() + until_fork + delay) }) } diff --git a/beacon_node/network/src/sync/block_processor.rs b/beacon_node/network/src/sync/block_processor.rs index 8c53869e40..e67f775ca2 100644 --- a/beacon_node/network/src/sync/block_processor.rs +++ b/beacon_node/network/src/sync/block_processor.rs @@ -34,7 +34,7 @@ pub fn spawn_block_processor( chain: Weak>, process_id: ProcessId, downloaded_blocks: Vec>, - mut sync_send: mpsc::UnboundedSender>, + sync_send: mpsc::UnboundedSender>, log: slog::Logger, ) { std::thread::spawn(move || { @@ -64,7 +64,7 @@ pub fn spawn_block_processor( downloaded_blocks, result, }; - sync_send.try_send(msg).unwrap_or_else(|_| { + sync_send.send(msg).unwrap_or_else(|_| { debug!( log, "Block processor could not inform range sync result. Likely shutting down." @@ -84,7 +84,7 @@ pub fn spawn_block_processor( (_, Err(e)) => { warn!(log, "Parent lookup failed"; "last_peer_id" => format!("{}", peer_id), "error" => e); sync_send - .try_send(SyncMessage::ParentLookupFailed(peer_id)) + .send(SyncMessage::ParentLookupFailed(peer_id)) .unwrap_or_else(|_| { // on failure, inform to downvote the peer debug!( diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 47fa65d881..9a378472d5 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -43,7 +43,6 @@ use eth2_libp2p::rpc::{methods::*, RequestId}; use eth2_libp2p::types::NetworkGlobals; use eth2_libp2p::PeerId; use fnv::FnvHashMap; -use futures::prelude::*; use slog::{crit, debug, error, info, trace, warn, Logger}; use smallvec::SmallVec; use std::boxed::Box; @@ -182,7 +181,7 @@ impl SingleBlockRequest { /// chain. This allows the chain to be /// dropped during the syncing process which will gracefully end the `SyncManager`. pub fn spawn( - executor: &tokio::runtime::TaskExecutor, + runtime_handle: &tokio::runtime::Handle, beacon_chain: Arc>, network_globals: Arc>, network_send: mpsc::UnboundedSender>, @@ -197,7 +196,7 @@ pub fn spawn( let (sync_send, sync_recv) = mpsc::unbounded_channel::>(); // create an instance of the SyncManager - let sync_manager = SyncManager { + let mut sync_manager = SyncManager { range_sync: RangeSync::new( beacon_chain.clone(), network_globals.clone(), @@ -216,14 +215,10 @@ pub fn spawn( // spawn the sync manager thread debug!(log, "Sync Manager started"); - executor.spawn( - sync_manager - .select(exit_rx.then(|_| Ok(()))) - .then(move |_| { - info!(log.clone(), "Sync Manager shutdown"); - Ok(()) - }), - ); + runtime_handle.spawn(async move { + futures::future::select(Box::pin(sync_manager.main()), exit_rx).await; + info!(log.clone(), "Sync Manager shutdown"); + }); (sync_send, sync_exit) } @@ -730,17 +725,13 @@ impl SyncManager { self.parent_queue.push(parent_request); } } -} -impl Future for SyncManager { - type Item = (); - type Error = String; - - fn poll(&mut self) -> Result, Self::Error> { + /// The main driving future for the sync manager. + async fn main(&mut self) { // process any inbound messages loop { - match self.input_channel.poll() { - Ok(Async::Ready(Some(message))) => match message { + if let Some(sync_message) = self.input_channel.recv().await { + match sync_message { SyncMessage::AddPeer(peer_id, info) => { self.add_peer(peer_id, info); } @@ -792,17 +783,8 @@ impl Future for SyncManager { SyncMessage::ParentLookupFailed(peer_id) => { self.network.downvote_peer(peer_id); } - }, - Ok(Async::NotReady) => break, - Ok(Async::Ready(None)) => { - return Err("Sync manager channel closed".into()); - } - Err(e) => { - return Err(format!("Sync Manager channel error: {:?}", e)); } } } - - Ok(Async::NotReady) } } diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index b28164b6d5..377729adbd 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -104,7 +104,7 @@ impl SyncNetworkContext { // ignore the error if the channel send fails let _ = self.send_rpc_request(peer_id.clone(), RPCRequest::Goodbye(reason)); self.network_send - .try_send(NetworkMessage::Disconnect { peer_id }) + .send(NetworkMessage::Disconnect { peer_id }) .unwrap_or_else(|_| { warn!( self.log, @@ -130,7 +130,7 @@ impl SyncNetworkContext { rpc_event: RPCEvent, ) -> Result<(), &'static str> { self.network_send - .try_send(NetworkMessage::RPC(peer_id, rpc_event)) + .send(NetworkMessage::RPC(peer_id, rpc_event)) .map_err(|_| { debug!( self.log, diff --git a/beacon_node/network/src/sync/range_sync/range.rs b/beacon_node/network/src/sync/range_sync/range.rs index 59c789f819..525b106f07 100644 --- a/beacon_node/network/src/sync/range_sync/range.rs +++ b/beacon_node/network/src/sync/range_sync/range.rs @@ -355,7 +355,7 @@ impl RangeSync { peer_id: &PeerId, ) { // if the peer is in the awaiting head mapping, remove it - self.awaiting_head_peers.remove(&peer_id); + self.awaiting_head_peers.remove(peer_id); // remove the peer from any peer pool self.remove_peer(network, peer_id);