diff --git a/beacon_node/client/src/lib.rs b/beacon_node/client/src/lib.rs index 6600c9e39b..7312cc6c8c 100644 --- a/beacon_node/client/src/lib.rs +++ b/beacon_node/client/src/lib.rs @@ -34,7 +34,7 @@ impl Client { pub fn new( config: ClientConfig, log: slog::Logger, - executor: TaskExecutor, + executor: &TaskExecutor, ) -> error::Result { let (exit_signal, exit) = exit_future::signal(); diff --git a/beacon_node/libp2p/src/behaviour.rs b/beacon_node/libp2p/src/behaviour.rs index 2c0371095f..96355cf3fa 100644 --- a/beacon_node/libp2p/src/behaviour.rs +++ b/beacon_node/libp2p/src/behaviour.rs @@ -1,4 +1,4 @@ -use crate::rpc::{RPCMethod, RPCRequest, RPCResponse, Rpc, RpcEvent}; +use crate::rpc::{Rpc, RpcEvent}; use futures::prelude::*; use libp2p::{ core::swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess}, @@ -42,22 +42,7 @@ impl NetworkBehaviourEventProcess for Behaviour { fn inject_event(&mut self, event: RpcEvent) { - match event { - RpcEvent::Request { - id, - method_id, - body, - } => self.events.push(BehaviourEvent::RPCRequest { - id, - method: RPCMethod::from(method_id), - body, - }), - RpcEvent::Response { - id, - method_id, - result, - } => self.events.push(BehaviourEvent::RPCResponse { id, result }), - } + self.events.push(BehaviourEvent::RPC(event)); } } @@ -95,15 +80,7 @@ impl Behaviour { /// The types of events than can be obtained from polling the behaviour. pub enum BehaviourEvent { - RPCRequest { - id: u64, - method: RPCMethod, - body: RPCRequest, - }, - RPCResponse { - id: u64, - result: RPCResponse, - }, + RPC(RpcEvent), // TODO: This is a stub at the moment Message(String), } diff --git a/beacon_node/libp2p/src/lib.rs b/beacon_node/libp2p/src/lib.rs index 718b7fc222..69f6eb6508 100644 --- a/beacon_node/libp2p/src/lib.rs +++ b/beacon_node/libp2p/src/lib.rs @@ -5,7 +5,7 @@ pub mod behaviour; pub mod error; mod network_config; -mod rpc; +pub mod rpc; mod service; pub use libp2p::{ @@ -13,6 +13,8 @@ pub use libp2p::{ PeerId, }; pub use network_config::NetworkConfig; +pub use rpc::HelloMessage; +pub use rpc::RpcEvent; pub use service::Libp2pEvent; pub use service::Service; pub use types::multiaddr; diff --git a/beacon_node/libp2p/src/rpc/methods.rs b/beacon_node/libp2p/src/rpc/methods.rs index b6563ba649..ea9932806b 100644 --- a/beacon_node/libp2p/src/rpc/methods.rs +++ b/beacon_node/libp2p/src/rpc/methods.rs @@ -19,17 +19,17 @@ impl From for RPCMethod { #[derive(Debug, Clone)] pub enum RPCRequest { - Hello(HelloBody), + Hello(HelloMessage), } #[derive(Debug, Clone)] pub enum RPCResponse { - Hello(HelloBody), + Hello(HelloMessage), } // request/response structs for RPC methods #[derive(Encode, Decode, Clone, Debug)] -pub struct HelloBody { +pub struct HelloMessage { pub network_id: u8, pub latest_finalized_root: Hash256, pub latest_finalized_epoch: Epoch, diff --git a/beacon_node/libp2p/src/rpc/mod.rs b/beacon_node/libp2p/src/rpc/mod.rs index 4cebb1e396..3420217ce7 100644 --- a/beacon_node/libp2p/src/rpc/mod.rs +++ b/beacon_node/libp2p/src/rpc/mod.rs @@ -11,7 +11,7 @@ use libp2p::core::swarm::{ ConnectedPoint, NetworkBehaviour, NetworkBehaviourAction, PollParameters, }; use libp2p::{Multiaddr, PeerId}; -pub use methods::{RPCMethod, RPCRequest, RPCResponse}; +pub use methods::{HelloMessage, RPCMethod, RPCRequest, RPCResponse}; pub use protocol::{RPCProtocol, RpcEvent}; use std::marker::PhantomData; use tokio::io::{AsyncRead, AsyncWrite}; diff --git a/beacon_node/libp2p/src/rpc/protocol.rs b/beacon_node/libp2p/src/rpc/protocol.rs index 4b462bb77c..74b8322eb0 100644 --- a/beacon_node/libp2p/src/rpc/protocol.rs +++ b/beacon_node/libp2p/src/rpc/protocol.rs @@ -1,4 +1,4 @@ -use super::methods::{HelloBody, RPCMethod, RPCRequest, RPCResponse}; +use super::methods::{HelloMessage, RPCMethod, RPCRequest, RPCResponse}; use libp2p::core::{upgrade, InboundUpgrade, OutboundUpgrade, UpgradeInfo}; use ssz::{ssz_encode, Decodable, Encodable, SszStream}; use std::io; @@ -78,7 +78,7 @@ fn decode(packet: Vec) -> Result { if request { let body = match RPCMethod::from(method_id) { RPCMethod::Hello => { - let (hello_body, _index) = HelloBody::ssz_decode(&packet, index)?; + let (hello_body, _index) = HelloMessage::ssz_decode(&packet, index)?; RPCRequest::Hello(hello_body) } RPCMethod::Unknown => return Err(DecodeError::UnknownRPCMethod), @@ -94,7 +94,7 @@ fn decode(packet: Vec) -> Result { else { let result = match RPCMethod::from(method_id) { RPCMethod::Hello => { - let (body, _index) = HelloBody::ssz_decode(&packet, index)?; + let (body, _index) = HelloMessage::ssz_decode(&packet, index)?; RPCResponse::Hello(body) } RPCMethod::Unknown => return Err(DecodeError::UnknownRPCMethod), diff --git a/beacon_node/libp2p/src/service.rs b/beacon_node/libp2p/src/service.rs index 00c11101c3..a672e153b8 100644 --- a/beacon_node/libp2p/src/service.rs +++ b/beacon_node/libp2p/src/service.rs @@ -1,6 +1,7 @@ use crate::behaviour::{Behaviour, BehaviourEvent}; use crate::error; use crate::multiaddr::Protocol; +use crate::rpc::RpcEvent; use crate::NetworkConfig; use futures::prelude::*; use futures::Stream; @@ -104,8 +105,9 @@ impl Stream for Service { debug!(self.log, "Message received: {}", m); return Ok(Async::Ready(Some(Libp2pEvent::Message(m)))); } - // TODO: Fill with all behaviour events - _ => break, + Ok(Async::Ready(Some(BehaviourEvent::RPC(event)))) => { + return Ok(Async::Ready(Some(Libp2pEvent::RPC(event)))); + } Ok(Async::Ready(None)) => unreachable!("Swarm stream shouldn't end"), Ok(Async::NotReady) => break, _ => break, @@ -152,5 +154,7 @@ fn build_transport( /// Events that can be obtained from polling the Libp2p Service. pub enum Libp2pEvent { + // We have received an RPC event on the swarm + RPC(RpcEvent), Message(String), } diff --git a/beacon_node/network/src/message_handler.rs b/beacon_node/network/src/message_handler.rs index 87935e8996..fe9780ad52 100644 --- a/beacon_node/network/src/message_handler.rs +++ b/beacon_node/network/src/message_handler.rs @@ -1,7 +1,10 @@ use crate::error; use crate::messages::NodeMessage; -use crossbeam_channel::{unbounded as channel, Sender}; -use libp2p::PeerId; +use crossbeam_channel::{unbounded as channel, Sender, TryRecvError}; +use futures::future; +use futures::prelude::*; +use libp2p::rpc; +use libp2p::{PeerId, RpcEvent}; use slog::debug; use sync::SimpleSync; use types::Hash256; @@ -11,9 +14,11 @@ pub struct MessageHandler { sync: SimpleSync, //TODO: Implement beacon chain //chain: BeaconChain + log: slog::Logger, } /// Types of messages the handler can receive. +#[derive(Debug, Clone)] pub enum HandlerMessage { /// Peer has connected. PeerConnected(PeerId), @@ -21,11 +26,16 @@ pub enum HandlerMessage { PeerDisconnected(PeerId), /// A Node message has been received. Message(PeerId, NodeMessage), + /// An RPC response/request has been received. + RPC(RpcEvent), } impl MessageHandler { /// Initializes and runs the MessageHandler. - pub fn new(log: slog::Logger) -> error::Result> { + pub fn new( + executor: &tokio::runtime::TaskExecutor, + log: slog::Logger, + ) -> error::Result> { debug!(log, "Service starting"); let (handler_send, handler_recv) = channel(); @@ -33,12 +43,29 @@ impl MessageHandler { // Initialise sync and begin processing in thread //TODO: Load genesis from BeaconChain let temp_genesis = Hash256::zero(); + + // generate the Message handler let sync = SimpleSync::new(temp_genesis); + //TODO: Initialise beacon chain + let mut handler = MessageHandler { + sync, + log: log.clone(), + }; - let handler = MessageHandler { sync }; - - // spawn handler thread + // spawn handler task + // TODO: Handle manual termination of thread + executor.spawn(future::poll_fn(move || -> Result<_, _> { + loop { + handler.handle_message(handler_recv.recv().map_err(|_| { + debug!(log, "Handler channel closed. Handler terminating"); + })?); + } + })); Ok(handler_send) } + + fn handle_message(&mut self, message: HandlerMessage) { + debug!(self.log, "Message received {:?}", message); + } } diff --git a/beacon_node/network/src/messages.rs b/beacon_node/network/src/messages.rs index d3a83fd5c8..064424a87f 100644 --- a/beacon_node/network/src/messages.rs +++ b/beacon_node/network/src/messages.rs @@ -1,27 +1,16 @@ use libp2p::PeerId; +use libp2p::{HelloMessage, RpcEvent}; use types::{Hash256, Slot}; /// Messages between nodes across the network. #[derive(Debug, Clone)] pub enum NodeMessage { - Status(Status), + RPC(RpcEvent), BlockRequest, // TODO: only for testing - remove Message(String), } -#[derive(Debug, Clone)] -pub struct Status { - /// Current node version. - version: u8, - /// Genesis Hash. - genesis_hash: Hash256, - /// Best known slot number. - best_slot: Slot, - /// Best known slot hash. - best_slot_hash: Hash256, -} - /// Types of messages that the network service can receive. #[derive(Debug, Clone)] pub enum NetworkMessage { diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index e75b7e49a8..bd01027e96 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -28,12 +28,12 @@ pub struct Service { impl Service { pub fn new( config: NetworkConfig, - executor: TaskExecutor, + executor: &TaskExecutor, log: slog::Logger, ) -> error::Result<(Arc, Sender)> { // launch message handler thread let message_handler_log = log.new(o!("Service" => "MessageHandler")); - let message_handler_send = MessageHandler::new(message_handler_log)?; + let message_handler_send = MessageHandler::new(executor, message_handler_log)?; // launch libp2p service let libp2p_log = log.new(o!("Service" => "Libp2p")); @@ -61,7 +61,7 @@ impl Service { fn spawn_service( libp2p_service: LibP2PService, message_handler_send: crossbeam_channel::Sender, - executor: TaskExecutor, + executor: &TaskExecutor, log: slog::Logger, ) -> error::Result<( crossbeam_channel::Sender, @@ -99,6 +99,15 @@ fn network_service( // poll the swarm loop { match libp2p_service.poll() { + Ok(Async::Ready(Some(Libp2pEvent::RPC(rpc_event)))) => { + debug!( + libp2p_service.log, + "RPC Event: Rpc message received: {:?}", rpc_event + ); + message_handler_send + .send(HandlerMessage::RPC(rpc_event)) + .map_err(|_| "failed to send rpc to handler"); + } Ok(Async::Ready(Some(Libp2pEvent::Message(m)))) => debug!( libp2p_service.log, "Network Service: Message received: {}", m diff --git a/beacon_node/src/run.rs b/beacon_node/src/run.rs index 12d761d84c..810f2aeafd 100644 --- a/beacon_node/src/run.rs +++ b/beacon_node/src/run.rs @@ -32,7 +32,7 @@ pub fn run_beacon_node(config: ClientConfig, log: slog::Logger) -> error::Result let executor = runtime.executor(); // currently testing - using TestingNode type - let client: Client = Client::new(config, log.clone(), executor.clone())?; + let client: Client = Client::new(config, log.clone(), &executor)?; notifier::run(&client, executor, exit); runtime.block_on(ctrlc);