From 495348f934895a2e0057e47889da1010ecd41ced Mon Sep 17 00:00:00 2001 From: Age Manning Date: Tue, 19 Mar 2019 11:25:42 +1100 Subject: [PATCH] Adds RPC request send framework in message handler --- beacon_node/network/src/message_handler.rs | 51 +++++++++++++++++----- 1 file changed, 41 insertions(+), 10 deletions(-) diff --git a/beacon_node/network/src/message_handler.rs b/beacon_node/network/src/message_handler.rs index 7e5a74a102..2d4d47b86b 100644 --- a/beacon_node/network/src/message_handler.rs +++ b/beacon_node/network/src/message_handler.rs @@ -6,15 +6,18 @@ use crate::sync::SimpleSync; use crossbeam_channel::{unbounded as channel, Sender}; use futures::future; use futures::prelude::*; -use libp2p::{PeerId, RPCEvent}; +use libp2p::{ + rpc::{RPCRequest, RPCResponse}, + PeerId, RPCEvent, +}; use slog::debug; use std::collections::HashMap; use std::sync::Arc; use std::time::{Duration, Instant}; use types::Hash256; -/// Timeout for establishing a HELLO handshake. -const HELLO_TIMEOUT: Duration = Duration::from_secs(30); +/// Timeout for RPC requests. +const REQUEST_TIMEOUT: Duration = Duration::from_secs(30); /// Handles messages received from the network and client and organises syncing. pub struct MessageHandler { @@ -24,12 +27,22 @@ pub struct MessageHandler { sync: SimpleSync, /// The network channel to relay messages to the Network service. network_send: crossbeam_channel::Sender, - /// A mapping of peers we have sent a HELLO rpc request to. - hello_requests: HashMap, + /// A mapping of peers we have sent an RPC request to. + requests: HashMap>, + /// A counter of request id for each peer. + request_ids: HashMap, /// The `MessageHandler` logger. log: slog::Logger, } +/// RPC request information +pub struct RPCRequestInfo { + /// The id of the request + id: u16, + /// The time the request was sent, to check ttl. + request_time: Instant, +} + /// Types of messages the handler can receive. #[derive(Debug, Clone)] pub enum HandlerMessage { @@ -64,7 +77,8 @@ impl MessageHandler { chain: beacon_chain.clone(), sync, network_send, - hello_requests: HashMap::new(), + requests: HashMap::new(), + request_ids: HashMap::new(), log: log.clone(), }; @@ -84,8 +98,6 @@ impl MessageHandler { fn handle_message(&mut self, message: HandlerMessage) { match message { HandlerMessage::PeerDialed(peer_id) => { - // register RPC request - self.hello_requests.insert(peer_id.clone(), Instant::now()); self.send_hello(peer_id); } //TODO: Handle all messages @@ -94,8 +106,27 @@ impl MessageHandler { } /// Sends a HELLO RPC request to a newly connected peer. - fn send_hello(&self, peer_id: PeerId) { + fn send_hello(&mut self, peer_id: PeerId) { + // generate a unique id for the peer + let id = { + let borrowed_id = self.request_ids.entry(peer_id.clone()).or_insert_with(|| 0); + let id = borrowed_id.clone(); + //increment the counter + *borrowed_id += 1; + id + }; + // register RPC request + { + let requests = self.requests.entry(peer_id).or_insert_with(|| vec![]); + requests.push(RPCRequestInfo { + id: id.clone(), + request_time: Instant::now(), + }); + } // send the hello request to the network - //sync.hello() + self.send_rpc_request(id, RPCResponse::Hello(self.sync.generate_hello())); } + + /// Sends and RPC response + fn send_rpc_request(&self, request_id: u16, response: RPCResponse) {} }