Add first changes to syncing logic

- Adds testing framework
- Breaks out new `NetworkContext` object
This commit is contained in:
Paul Hauner
2019-03-21 17:17:01 +11:00
parent 4105b869e1
commit ca18d4390a
14 changed files with 513 additions and 82 deletions

View File

@@ -5,32 +5,28 @@ use crate::sync::SimpleSync;
use crossbeam_channel::{unbounded as channel, Sender};
use futures::future;
use libp2p::{
rpc::{RPCMethod, RPCRequest, RPCResponse},
rpc::{RPCRequest, RPCResponse},
HelloMessage, PeerId, RPCEvent,
};
use slog::debug;
use slog::warn;
use slog::{debug, trace};
use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Duration, Instant};
use std::time::Instant;
/// Timeout for RPC requests.
const REQUEST_TIMEOUT: Duration = Duration::from_secs(30);
// const REQUEST_TIMEOUT: Duration = Duration::from_secs(30);
/// Timeout before banning a peer for non-identification.
const HELLO_TIMEOUT: Duration = Duration::from_secs(30);
// const HELLO_TIMEOUT: Duration = Duration::from_secs(30);
/// Handles messages received from the network and client and organises syncing.
pub struct MessageHandler {
/// Currently loaded and initialised beacon chain.
chain: Arc<BeaconChain>,
_chain: Arc<BeaconChain>,
/// The syncing framework.
sync: SimpleSync,
/// The network channel to relay messages to the Network service.
network_send: crossbeam_channel::Sender<NetworkMessage>,
/// A mapping of peers and the RPC id we have sent an RPC request to.
requests: HashMap<(PeerId, u64), Instant>,
/// A counter of request id for each peer.
request_ids: HashMap<PeerId, u64>,
/// The context required to send messages to, and process messages from peers.
network_context: NetworkContext,
/// The `MessageHandler` logger.
log: slog::Logger,
}
@@ -65,13 +61,9 @@ impl MessageHandler {
let sync = SimpleSync::new(beacon_chain.clone(), &log);
let mut handler = MessageHandler {
// TODO: The handler may not need a chain, perhaps only sync?
chain: beacon_chain.clone(),
_chain: beacon_chain.clone(),
sync,
network_send,
requests: HashMap::new(),
request_ids: HashMap::new(),
network_context: NetworkContext::new(network_send, log.clone()),
log: log.clone(),
};
@@ -93,8 +85,7 @@ impl MessageHandler {
match message {
// we have initiated a connection to a peer
HandlerMessage::PeerDialed(peer_id) => {
let id = self.generate_request_id(&peer_id);
self.send_hello(peer_id, id, true);
self.sync.on_connect(&peer_id, &mut self.network_context);
}
// we have received an RPC message request/response
HandlerMessage::RPC(peer_id, rpc_event) => {
@@ -118,9 +109,11 @@ impl MessageHandler {
/// A new RPC request has been received from the network.
fn handle_rpc_request(&mut self, peer_id: PeerId, id: u64, request: RPCRequest) {
// TODO: ensure the id is legit
match request {
RPCRequest::Hello(hello_message) => {
self.handle_hello_request(peer_id, id, hello_message)
self.sync
.on_hello(&peer_id, hello_message, &mut self.network_context)
}
// TODO: Handle all requests
_ => {}
@@ -131,7 +124,12 @@ impl MessageHandler {
// we match on id and ignore responses past the timeout.
fn handle_rpc_response(&mut self, peer_id: PeerId, id: u64, response: RPCResponse) {
// if response id is related to a request, ignore (likely RPC timeout)
if self.requests.remove(&(peer_id.clone(), id)).is_none() {
if self
.network_context
.requests
.remove(&(peer_id.clone(), id))
.is_none()
{
debug!(self.log, "Unrecognized response from peer: {:?}", peer_id);
return;
}
@@ -145,16 +143,10 @@ impl MessageHandler {
}
}
/// Handle a HELLO RPC request message.
fn handle_hello_request(&mut self, peer_id: PeerId, id: u64, hello_message: HelloMessage) {
// send back a HELLO message
self.send_hello(peer_id.clone(), id, false);
// validate the peer
self.validate_hello(peer_id, hello_message);
}
/// Validate a HELLO RPC message.
fn validate_hello(&mut self, peer_id: PeerId, message: HelloMessage) {
self.sync
.on_hello(&peer_id, message.clone(), &mut self.network_context);
// validate the peer
if !self.sync.validate_peer(peer_id.clone(), message) {
debug!(
@@ -164,8 +156,68 @@ impl MessageHandler {
//TODO: block/ban the peer
}
}
}
/* General RPC helper functions */
pub struct NetworkContext {
/// The network channel to relay messages to the Network service.
network_send: crossbeam_channel::Sender<NetworkMessage>,
/// A mapping of peers and the RPC id we have sent an RPC request to.
requests: HashMap<(PeerId, u64), Instant>,
/// A counter of request id for each peer.
request_ids: HashMap<PeerId, u64>,
/// The `MessageHandler` logger.
log: slog::Logger,
}
impl NetworkContext {
pub fn new(network_send: crossbeam_channel::Sender<NetworkMessage>, log: slog::Logger) -> Self {
Self {
network_send,
requests: HashMap::new(),
request_ids: HashMap::new(),
log,
}
}
pub fn send_rpc_request(&mut self, peer_id: PeerId, rpc_request: RPCRequest) {
let id = self.generate_request_id(&peer_id);
self.send_rpc_event(
peer_id,
RPCEvent::Request {
id,
method_id: rpc_request.method_id(),
body: rpc_request,
},
);
}
pub fn send_rpc_response(&mut self, peer_id: PeerId, rpc_response: RPCResponse) {
let id = self.generate_request_id(&peer_id);
self.send_rpc_event(
peer_id,
RPCEvent::Response {
id,
method_id: rpc_response.method_id(),
result: rpc_response,
},
);
}
fn send_rpc_event(&self, peer_id: PeerId, rpc_event: RPCEvent) {
self.send(peer_id, OutgoingMessage::RPC(rpc_event))
}
fn send(&self, peer_id: PeerId, outgoing_message: OutgoingMessage) {
self.network_send
.send(NetworkMessage::Send(peer_id, outgoing_message))
.unwrap_or_else(|_| {
warn!(
self.log,
"Could not send RPC message to the network service"
)
});
//
}
/// Generates a new request id for a peer.
fn generate_request_id(&mut self, peer_id: &PeerId) -> u64 {
@@ -185,41 +237,4 @@ impl MessageHandler {
);
id
}
/// Sends a HELLO RPC request or response to a newly connected peer.
//TODO: The boolean determines if sending request/respond, will be cleaner in the RPC re-write
fn send_hello(&mut self, peer_id: PeerId, id: u64, is_request: bool) {
let rpc_event = if is_request {
RPCEvent::Request {
id,
method_id: RPCMethod::Hello.into(),
body: RPCRequest::Hello(self.sync.generate_hello()),
}
} else {
RPCEvent::Response {
id,
method_id: RPCMethod::Hello.into(),
result: RPCResponse::Hello(self.sync.generate_hello()),
}
};
// send the hello request to the network
trace!(self.log, "Sending HELLO message to peer {:?}", peer_id);
self.send_rpc(peer_id, rpc_event);
}
/// Sends an RPC request/response to the network server.
fn send_rpc(&self, peer_id: PeerId, rpc_event: RPCEvent) {
self.network_send
.send(NetworkMessage::Send(
peer_id,
OutgoingMessage::RPC(rpc_event),
))
.unwrap_or_else(|_| {
warn!(
self.log,
"Could not send RPC message to the network service"
)
});
}
}