diff --git a/beacon_node/client/src/notifier.rs b/beacon_node/client/src/notifier.rs index 987f064a4a..ff6c1b230c 100644 --- a/beacon_node/client/src/notifier.rs +++ b/beacon_node/client/src/notifier.rs @@ -8,7 +8,7 @@ use tokio::runtime::TaskExecutor; use tokio::timer::Interval; /// The interval between heartbeat events. -pub const HEARTBEAT_INTERVAL_SECONDS: u64 = 5; +pub const HEARTBEAT_INTERVAL_SECONDS: u64 = 15; /// Spawns a thread that can be used to run code periodically, on `HEARTBEAT_INTERVAL_SECONDS` /// durations. @@ -25,19 +25,22 @@ pub fn run( Duration::from_secs(HEARTBEAT_INTERVAL_SECONDS), ); - let _log = client.log.new(o!("Service" => "Notifier")); + let log = client.log.new(o!("Service" => "Notifier")); + + let libp2p = client.network.libp2p_service(); + + let heartbeat = move |_| { + // Notify the number of connected nodes + // Panic if libp2p is poisoned + debug!(log, ""; "Connected Peers" => libp2p.lock().swarm.connected_peers()); - let heartbeat = |_| { - // There is not presently any heartbeat logic. - // - // We leave this function empty for future use. Ok(()) }; // map error and spawn - let log = client.log.clone(); + let err_log = client.log.clone(); let heartbeat_interval = interval - .map_err(move |e| debug!(log, "Timer error {}", e)) + .map_err(move |e| debug!(err_log, "Timer error {}", e)) .for_each(heartbeat); executor.spawn(exit.until(heartbeat_interval).map(|_| ())); diff --git a/beacon_node/eth2-libp2p/src/behaviour.rs b/beacon_node/eth2-libp2p/src/behaviour.rs index 1bff58ecdf..9a30a60b96 100644 --- a/beacon_node/eth2-libp2p/src/behaviour.rs +++ b/beacon_node/eth2-libp2p/src/behaviour.rs @@ -171,6 +171,11 @@ impl Behaviour { pub fn send_rpc(&mut self, peer_id: PeerId, rpc_event: RPCEvent) { self.serenity_rpc.send_rpc(peer_id, rpc_event); } + + /* Discovery / Peer management functions */ + pub fn connected_peers(&self) -> usize { + self.discovery.connected_peers() + } } /// The types of events than can be obtained from polling the behaviour. diff --git a/beacon_node/eth2-libp2p/src/discovery.rs b/beacon_node/eth2-libp2p/src/discovery.rs index 49e5dbeb56..29725ff36d 100644 --- a/beacon_node/eth2-libp2p/src/discovery.rs +++ b/beacon_node/eth2-libp2p/src/discovery.rs @@ -106,6 +106,11 @@ impl Discovery { self.discovery.add_enr(enr); } + /// The current number of connected libp2p peers. + pub fn connected_peers(&self) -> usize { + self.connected_peers.len() + } + /// Search for new peers using the underlying discovery mechanism. fn find_peers(&mut self) { // pick a random NodeId diff --git a/beacon_node/network/Cargo.toml b/beacon_node/network/Cargo.toml index 1499ac5808..9eadede76e 100644 --- a/beacon_node/network/Cargo.toml +++ b/beacon_node/network/Cargo.toml @@ -19,3 +19,4 @@ tree_hash = { path = "../../eth2/utils/tree_hash" } futures = "0.1.25" error-chain = "0.12.0" tokio = "0.1.16" +parking_lot = "0.9.0" diff --git a/beacon_node/network/src/message_handler.rs b/beacon_node/network/src/message_handler.rs index 2395470781..b1d88415c3 100644 --- a/beacon_node/network/src/message_handler.rs +++ b/beacon_node/network/src/message_handler.rs @@ -105,7 +105,7 @@ impl MessageHandler { fn handle_rpc_message(&mut self, peer_id: PeerId, rpc_message: RPCEvent) { match rpc_message { RPCEvent::Request(id, req) => self.handle_rpc_request(peer_id, id, req), - RPCEvent::Response(id, resp) => self.handle_rpc_response(peer_id, id, resp), + RPCEvent::Response(_id, resp) => self.handle_rpc_response(peer_id, resp), RPCEvent::Error(id, error) => self.handle_rpc_error(peer_id, id, error), } } @@ -148,18 +148,10 @@ impl MessageHandler { /// An RPC response has been received from the network. // we match on id and ignore responses past the timeout. - fn handle_rpc_response( - &mut self, - peer_id: PeerId, - id: RequestId, - error_response: RPCErrorResponse, - ) { + fn handle_rpc_response(&mut self, peer_id: PeerId, error_response: RPCErrorResponse) { // an error could have occurred. // TODO: Handle Error gracefully match error_response { - RPCErrorResponse::EncodingError => { - warn!(self.log, "Encoding Error"; "peer" => format!("{:?}", peer_id), "request_id" => format!("{}",id)) - } RPCErrorResponse::InvalidRequest(error) => { warn!(self.log, "";"peer" => format!("{:?}", peer_id), "Invalid Request" => error.as_string()) } diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index 0c0af367a8..a771f8add4 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -8,6 +8,7 @@ use eth2_libp2p::{Libp2pEvent, PeerId}; use eth2_libp2p::{PubsubMessage, RPCEvent}; use futures::prelude::*; use futures::Stream; +use parking_lot::Mutex; use slog::{debug, info, o, trace}; use std::marker::PhantomData; use std::sync::Arc; @@ -16,9 +17,9 @@ use tokio::sync::{mpsc, oneshot}; /// Service that handles communication between internal services and the eth2_libp2p network service. pub struct Service { - //libp2p_service: Arc>, + libp2p_service: Arc>, _libp2p_exit: oneshot::Sender<()>, - network_send: mpsc::UnboundedSender, + _network_send: mpsc::UnboundedSender, _phantom: PhantomData, //message_handler: MessageHandler, //message_handler_send: Sender } @@ -43,38 +44,33 @@ impl Service { // launch libp2p service let libp2p_log = log.new(o!("Service" => "Libp2p")); - let libp2p_service = LibP2PService::new(config.clone(), libp2p_log)?; + let libp2p_service = Arc::new(Mutex::new(LibP2PService::new(config.clone(), libp2p_log)?)); // TODO: Spawn thread to handle libp2p messages and pass to message handler thread. let libp2p_exit = spawn_service( - libp2p_service, + libp2p_service.clone(), network_recv, message_handler_send, executor, log, )?; let network_service = Service { + libp2p_service, _libp2p_exit: libp2p_exit, - network_send: network_send.clone(), + _network_send: network_send.clone(), _phantom: PhantomData, }; Ok((Arc::new(network_service), network_send)) } - // TODO: Testing only - pub fn send_message(&mut self) { - self.network_send - .try_send(NetworkMessage::Send( - PeerId::random(), - OutgoingMessage::NotifierTest, - )) - .unwrap(); + pub fn libp2p_service(&self) -> Arc> { + self.libp2p_service.clone() } } fn spawn_service( - libp2p_service: LibP2PService, + libp2p_service: Arc>, network_recv: mpsc::UnboundedReceiver, message_handler_send: mpsc::UnboundedSender, executor: &TaskExecutor, @@ -103,7 +99,7 @@ fn spawn_service( //TODO: Potentially handle channel errors fn network_service( - mut libp2p_service: LibP2PService, + libp2p_service: Arc>, mut network_recv: mpsc::UnboundedReceiver, mut message_handler_send: mpsc::UnboundedSender, log: slog::Logger, @@ -115,28 +111,18 @@ fn network_service( not_ready_count = 0; // poll the network channel match network_recv.poll() { - Ok(Async::Ready(Some(message))) => { - match message { - // TODO: Testing message - remove - NetworkMessage::Send(peer_id, outgoing_message) => { - match outgoing_message { - OutgoingMessage::RPC(rpc_event) => { - trace!(log, "Sending RPC Event: {:?}", rpc_event); - //TODO: Make swarm private - //TODO: Implement correct peer id topic message handling - libp2p_service.swarm.send_rpc(peer_id, rpc_event); - } - OutgoingMessage::NotifierTest => { - // debug!(log, "Received message from notifier"); - } - }; - } - NetworkMessage::Publish { topics, message } => { - debug!(log, "Sending pubsub message"; "topics" => format!("{:?}",topics)); - libp2p_service.swarm.publish(topics, *message); + Ok(Async::Ready(Some(message))) => match message { + NetworkMessage::Send(peer_id, outgoing_message) => match outgoing_message { + OutgoingMessage::RPC(rpc_event) => { + trace!(log, "Sending RPC Event: {:?}", rpc_event); + libp2p_service.lock().swarm.send_rpc(peer_id, rpc_event); } + }, + NetworkMessage::Publish { topics, message } => { + debug!(log, "Sending pubsub message"; "topics" => format!("{:?}",topics)); + libp2p_service.lock().swarm.publish(topics, *message); } - } + }, Ok(Async::NotReady) => not_ready_count += 1, Ok(Async::Ready(None)) => { return Err(eth2_libp2p::error::Error::from("Network channel closed")); @@ -147,7 +133,7 @@ fn network_service( } // poll the swarm - match libp2p_service.poll() { + match libp2p_service.lock().poll() { Ok(Async::Ready(Some(event))) => match event { Libp2pEvent::RPC(peer_id, rpc_event) => { trace!(log, "RPC Event: RPC message received: {:?}", rpc_event); @@ -182,6 +168,7 @@ fn network_service( Err(_) => not_ready_count += 1, } } + Ok(Async::NotReady) }) } @@ -204,6 +191,4 @@ pub enum NetworkMessage { pub enum OutgoingMessage { /// Send an RPC request/response. RPC(RPCEvent), - //TODO: Remove - NotifierTest, }