diff --git a/beacon_node/libp2p/src/rpc/methods.rs b/beacon_node/libp2p/src/rpc/methods.rs index ea9932806b..c99994b7ce 100644 --- a/beacon_node/libp2p/src/rpc/methods.rs +++ b/beacon_node/libp2p/src/rpc/methods.rs @@ -17,6 +17,15 @@ impl From for RPCMethod { } } +impl Into for RPCMethod { + fn into(self) -> u16 { + match self { + RPCMethod::Hello => 0, + _ => 0, + } + } +} + #[derive(Debug, Clone)] pub enum RPCRequest { Hello(HelloMessage), diff --git a/beacon_node/network/src/message_handler.rs b/beacon_node/network/src/message_handler.rs index 2d4d47b86b..dcc1452948 100644 --- a/beacon_node/network/src/message_handler.rs +++ b/beacon_node/network/src/message_handler.rs @@ -1,16 +1,17 @@ use crate::beacon_chain::BeaconChain; use crate::error; use crate::messages::NodeMessage; -use crate::service::NetworkMessage; +use crate::service::{NetworkMessage, OutgoingMessage}; use crate::sync::SimpleSync; use crossbeam_channel::{unbounded as channel, Sender}; use futures::future; use futures::prelude::*; use libp2p::{ - rpc::{RPCRequest, RPCResponse}, + rpc::{RPCMethod, RPCRequest, RPCResponse}, PeerId, RPCEvent, }; use slog::debug; +use slog::warn; use std::collections::HashMap; use std::sync::Arc; use std::time::{Duration, Instant}; @@ -30,7 +31,7 @@ pub struct MessageHandler { /// A mapping of peers we have sent an RPC request to. requests: HashMap>, /// A counter of request id for each peer. - request_ids: HashMap, + request_ids: HashMap, /// The `MessageHandler` logger. log: slog::Logger, } @@ -38,7 +39,7 @@ pub struct MessageHandler { /// RPC request information pub struct RPCRequestInfo { /// The id of the request - id: u16, + id: u64, /// The time the request was sent, to check ttl. request_time: Instant, } @@ -98,7 +99,7 @@ impl MessageHandler { fn handle_message(&mut self, message: HandlerMessage) { match message { HandlerMessage::PeerDialed(peer_id) => { - self.send_hello(peer_id); + self.send_hello_request(peer_id); } //TODO: Handle all messages _ => {} @@ -106,7 +107,7 @@ impl MessageHandler { } /// Sends a HELLO RPC request to a newly connected peer. - fn send_hello(&mut self, peer_id: PeerId) { + fn send_hello_request(&mut self, peer_id: PeerId) { // generate a unique id for the peer let id = { let borrowed_id = self.request_ids.entry(peer_id.clone()).or_insert_with(|| 0); @@ -117,16 +118,39 @@ impl MessageHandler { }; // register RPC request { - let requests = self.requests.entry(peer_id).or_insert_with(|| vec![]); + let requests = self + .requests + .entry(peer_id.clone()) + .or_insert_with(|| vec![]); requests.push(RPCRequestInfo { id: id.clone(), request_time: Instant::now(), }); } + + // build the rpc request + let rpc_event = RPCEvent::Request { + id, + method_id: RPCMethod::Hello.into(), + body: RPCRequest::Hello(self.sync.generate_hello()), + }; + // send the hello request to the network - self.send_rpc_request(id, RPCResponse::Hello(self.sync.generate_hello())); + self.send_rpc(peer_id, rpc_event); } /// Sends and RPC response - fn send_rpc_request(&self, request_id: u16, response: RPCResponse) {} + fn send_rpc(&self, peer_id: PeerId, rpc_event: RPCEvent) { + self.network_send + .send(NetworkMessage::Send( + peer_id, + OutgoingMessage::RPC(rpc_event), + )) + .unwrap_or_else(|_| { + warn!( + self.log, + "Could not send RPC message to the network service" + ) + }); + } }