Implements a new thread dedicated for syncing

This commit is contained in:
Age Manning
2019-09-07 00:28:54 +10:00
parent ee25766cae
commit 812e1fbe26
4 changed files with 514 additions and 457 deletions

View File

@@ -1,6 +1,6 @@
use crate::error;
use crate::service::NetworkMessage;
use crate::sync::SimpleSync;
use crate::sync::MessageProcessor;
use beacon_chain::{BeaconChain, BeaconChainTypes};
use eth2_libp2p::{
behaviour::PubsubMessage,
@@ -15,12 +15,16 @@ use std::sync::Arc;
use tokio::sync::mpsc;
use types::{Attestation, AttesterSlashing, BeaconBlock, ProposerSlashing, VoluntaryExit};
/// Handles messages received from the network and client and organises syncing.
/// Handles messages received from the network and client and organises syncing. This
/// functionality of this struct is to validate an decode messages from the network before
/// passing them to the internal message processor. The message processor spawns a syncing thread
/// which manages which blocks need to be requested and processed.
pub struct MessageHandler<T: BeaconChainTypes> {
/// The syncing framework.
sync: SimpleSync<T>,
/// A channel to the network service to allow for gossip propagation.
network_send: mpsc::UnboundedSender<NetworkMessage>,
/// Processes validated and decoded messages from the network. Has direct access to the
/// sync manager.
message_processor: MessageProcessor<T>,
/// The `MessageHandler` logger.
log: slog::Logger,
}
@@ -50,13 +54,15 @@ impl<T: BeaconChainTypes + 'static> MessageHandler<T> {
trace!(log, "Service starting");
let (handler_send, handler_recv) = mpsc::unbounded_channel();
// Initialise sync and begin processing in thread
let sync = SimpleSync::new(Arc::downgrade(&beacon_chain), network_send.clone(), &log);
// Initialise a message instance, which itself spawns the syncing thread.
let message_processor =
MessageProcessor::new(executor, beacon_chain, network_send.clone(), &log);
// generate the Message handler
let mut handler = MessageHandler {
network_send,
sync,
message_processor,
log: log.clone(),
};
@@ -66,7 +72,11 @@ impl<T: BeaconChainTypes + 'static> MessageHandler<T> {
.for_each(move |msg| Ok(handler.handle_message(msg)))
.map_err(move |_| {
debug!(log, "Network message handler terminated.");
}),
}), /*
.then(move |_| {
debug!(log.clone(), "Message handler shutdown");
}),
*/
);
Ok(handler_send)
@@ -77,11 +87,11 @@ impl<T: BeaconChainTypes + 'static> MessageHandler<T> {
match message {
// we have initiated a connection to a peer
HandlerMessage::PeerDialed(peer_id) => {
self.sync.on_connect(peer_id);
self.message_processor.on_connect(peer_id);
}
// A peer has disconnected
HandlerMessage::PeerDisconnected(peer_id) => {
self.sync.on_disconnect(peer_id);
self.message_processor.on_disconnect(peer_id);
}
// An RPC message request/response has been received
HandlerMessage::RPC(peer_id, rpc_event) => {
@@ -109,7 +119,7 @@ impl<T: BeaconChainTypes + 'static> MessageHandler<T> {
fn handle_rpc_request(&mut self, peer_id: PeerId, request_id: RequestId, request: RPCRequest) {
match request {
RPCRequest::Hello(hello_message) => {
self.sync
self.message_processor
.on_hello_request(peer_id, request_id, hello_message)
}
RPCRequest::Goodbye(goodbye_reason) => {
@@ -118,13 +128,13 @@ impl<T: BeaconChainTypes + 'static> MessageHandler<T> {
"peer" => format!("{:?}", peer_id),
"reason" => format!("{:?}", goodbye_reason),
);
self.sync.on_disconnect(peer_id);
self.message_processor.on_disconnect(peer_id);
}
RPCRequest::BeaconBlocks(request) => self
.sync
.message_processor
.on_beacon_blocks_request(peer_id, request_id, request),
RPCRequest::RecentBeaconBlocks(request) => self
.sync
.message_processor
.on_recent_beacon_blocks_request(peer_id, request_id, request),
}
}
@@ -151,12 +161,13 @@ impl<T: BeaconChainTypes + 'static> MessageHandler<T> {
RPCErrorResponse::Success(response) => {
match response {
RPCResponse::Hello(hello_message) => {
self.sync.on_hello_response(peer_id, hello_message);
self.message_processor
.on_hello_response(peer_id, hello_message);
}
RPCResponse::BeaconBlocks(response) => {
match self.decode_beacon_blocks(&response) {
Ok(beacon_blocks) => {
self.sync.on_beacon_blocks_response(
self.message_processor.on_beacon_blocks_response(
peer_id,
request_id,
beacon_blocks,
@@ -171,7 +182,7 @@ impl<T: BeaconChainTypes + 'static> MessageHandler<T> {
RPCResponse::RecentBeaconBlocks(response) => {
match self.decode_beacon_blocks(&response) {
Ok(beacon_blocks) => {
self.sync.on_recent_beacon_blocks_response(
self.message_processor.on_recent_beacon_blocks_response(
peer_id,
request_id,
beacon_blocks,
@@ -199,7 +210,9 @@ impl<T: BeaconChainTypes + 'static> MessageHandler<T> {
match gossip_message {
PubsubMessage::Block(message) => match self.decode_gossip_block(message) {
Ok(block) => {
let should_forward_on = self.sync.on_block_gossip(peer_id.clone(), block);
let should_forward_on = self
.message_processor
.on_block_gossip(peer_id.clone(), block);
// TODO: Apply more sophisticated validation and decoding logic
if should_forward_on {
self.propagate_message(id, peer_id.clone());
@@ -213,7 +226,8 @@ impl<T: BeaconChainTypes + 'static> MessageHandler<T> {
Ok(attestation) => {
// TODO: Apply more sophisticated validation and decoding logic
self.propagate_message(id, peer_id.clone());
self.sync.on_attestation_gossip(peer_id, attestation);
self.message_processor
.on_attestation_gossip(peer_id, attestation);
}
Err(e) => {
debug!(self.log, "Invalid gossiped attestation"; "peer_id" => format!("{}", peer_id), "Error" => format!("{:?}", e));