Merge message validation

This commit is contained in:
Age Manning
2019-09-05 03:06:57 +10:00
8 changed files with 106 additions and 47 deletions

View File

@@ -21,6 +21,8 @@ pub struct MessageHandler<T: BeaconChainTypes> {
_chain: Arc<BeaconChain<T>>,
/// The syncing framework.
sync: SimpleSync<T>,
/// A channel to the network service to allow for gossip propagation.
network_send: mpsc::UnboundedSender<NetworkMessage>,
/// The `MessageHandler` logger.
log: slog::Logger,
}
@@ -34,8 +36,9 @@ pub enum HandlerMessage {
PeerDisconnected(PeerId),
/// An RPC response/request has been received.
RPC(PeerId, RPCEvent),
/// A gossip message has been received.
PubsubMessage(PeerId, PubsubMessage),
/// A gossip message has been received. The fields are: message id, the peer that sent us this
/// message and the message itself.
PubsubMessage(String, PeerId, PubsubMessage),
}
impl<T: BeaconChainTypes + 'static> MessageHandler<T> {
@@ -50,12 +53,13 @@ impl<T: BeaconChainTypes + 'static> MessageHandler<T> {
let (handler_send, handler_recv) = mpsc::unbounded_channel();
// Initialise sync and begin processing in thread
let sync = SimpleSync::new(beacon_chain.clone(), network_send, &log);
let sync = SimpleSync::new(beacon_chain.clone(), network_send.clone(), &log);
// generate the Message handler
let mut handler = MessageHandler {
_chain: beacon_chain.clone(),
sync,
network_send,
log: log.clone(),
};
@@ -87,8 +91,8 @@ impl<T: BeaconChainTypes + 'static> MessageHandler<T> {
self.handle_rpc_message(peer_id, rpc_event);
}
// An RPC message request/response has been received
HandlerMessage::PubsubMessage(peer_id, gossip) => {
self.handle_gossip(peer_id, gossip);
HandlerMessage::PubsubMessage(id, peer_id, gossip) => {
self.handle_gossip(id, peer_id, gossip);
}
}
}
@@ -194,24 +198,34 @@ impl<T: BeaconChainTypes + 'static> MessageHandler<T> {
}
/// Handle RPC messages
fn handle_gossip(&mut self, peer_id: PeerId, gossip_message: PubsubMessage) {
fn handle_gossip(&mut self, id: String, peer_id: PeerId, gossip_message: PubsubMessage) {
match gossip_message {
PubsubMessage::Block(message) => match self.decode_gossip_block(message) {
Ok(block) => {
let _should_forward_on = self.sync.on_block_gossip(peer_id, block);
let should_forward_on = self.sync.on_block_gossip(peer_id.clone(), block);
// TODO: Apply more sophisticated validation and decoding logic
if should_forward_on {
self.propagate_message(id, peer_id.clone());
}
}
Err(e) => {
debug!(self.log, "Invalid gossiped beacon block"; "peer_id" => format!("{}", peer_id), "Error" => format!("{:?}", e));
}
},
PubsubMessage::Attestation(message) => match self.decode_gossip_attestation(message) {
Ok(attestation) => self.sync.on_attestation_gossip(peer_id, attestation),
Ok(attestation) => {
// TODO: Apply more sophisticated validation and decoding logic
self.propagate_message(id, peer_id.clone());
self.sync.on_attestation_gossip(peer_id, attestation);
}
Err(e) => {
debug!(self.log, "Invalid gossiped attestation"; "peer_id" => format!("{}", peer_id), "Error" => format!("{:?}", e));
}
},
PubsubMessage::VoluntaryExit(message) => match self.decode_gossip_exit(message) {
Ok(_exit) => {
// TODO: Apply more sophisticated validation and decoding logic
self.propagate_message(id, peer_id.clone());
// TODO: Handle exits
debug!(self.log, "Received a voluntary exit"; "peer_id" => format!("{}", peer_id) );
}
@@ -222,6 +236,8 @@ impl<T: BeaconChainTypes + 'static> MessageHandler<T> {
PubsubMessage::ProposerSlashing(message) => {
match self.decode_gossip_proposer_slashing(message) {
Ok(_slashing) => {
// TODO: Apply more sophisticated validation and decoding logic
self.propagate_message(id, peer_id.clone());
// TODO: Handle proposer slashings
debug!(self.log, "Received a proposer slashing"; "peer_id" => format!("{}", peer_id) );
}
@@ -233,6 +249,8 @@ impl<T: BeaconChainTypes + 'static> MessageHandler<T> {
PubsubMessage::AttesterSlashing(message) => {
match self.decode_gossip_attestation_slashing(message) {
Ok(_slashing) => {
// TODO: Apply more sophisticated validation and decoding logic
self.propagate_message(id, peer_id.clone());
// TODO: Handle attester slashings
debug!(self.log, "Received an attester slashing"; "peer_id" => format!("{}", peer_id) );
}
@@ -248,6 +266,21 @@ impl<T: BeaconChainTypes + 'static> MessageHandler<T> {
}
}
/// Informs the network service that the message should be forwarded to other peers.
fn propagate_message(&mut self, message_id: String, propagation_source: PeerId) {
self.network_send
.try_send(NetworkMessage::Propagate {
propagation_source,
message_id,
})
.unwrap_or_else(|_| {
warn!(
self.log,
"Could not send propagation request to the network service"
)
});
}
/* Decoding of gossipsub objects from the network.
*
* The decoding is done in the message handler as it has access to to a `BeaconChain` and can

View File

@@ -159,12 +159,23 @@ fn network_service(
// poll the network channel
match network_recv.poll() {
Ok(Async::Ready(Some(message))) => match message {
NetworkMessage::Send(peer_id, outgoing_message) => match outgoing_message {
OutgoingMessage::RPC(rpc_event) => {
trace!(log, "{}", rpc_event);
libp2p_service.lock().swarm.send_rpc(peer_id, rpc_event);
}
},
NetworkMessage::RPC(peer_id, rpc_event) => {
trace!(log, "{}", rpc_event);
libp2p_service.lock().swarm.send_rpc(peer_id, rpc_event);
}
NetworkMessage::Propagate {
propagation_source,
message_id,
} => {
trace!(log, "Propagating gossipsub message";
"propagation_peer" => format!("{:?}", propagation_source),
"message_id" => format!("{}", message_id),
);
libp2p_service
.lock()
.swarm
.propagate_message(&propagation_source, message_id);
}
NetworkMessage::Publish { topics, message } => {
debug!(log, "Sending pubsub message"; "topics" => format!("{:?}",topics));
libp2p_service.lock().swarm.publish(&topics, message);
@@ -203,13 +214,14 @@ fn network_service(
.map_err(|_| "Failed to send PeerDisconnected to handler")?;
}
Libp2pEvent::PubsubMessage {
source, message, ..
id,
source,
message,
..
} => {
//TODO: Decide if we need to propagate the topic upwards. (Potentially for
//attestations)
message_handler_send
.try_send(HandlerMessage::PubsubMessage(source, message))
.map_err(|_| " failed to send pubsub message to handler")?;
.try_send(HandlerMessage::PubsubMessage(id, source, message))
.map_err(|_| "Failed to send pubsub message to handler")?;
}
},
Ok(Async::Ready(None)) => unreachable!("Stream never ends"),
@@ -225,19 +237,16 @@ fn network_service(
/// Types of messages that the network service can receive.
#[derive(Debug)]
pub enum NetworkMessage {
/// Send a message to libp2p service.
//TODO: Define typing for messages across the wire
Send(PeerId, OutgoingMessage),
/// Publish a message to pubsub mechanism.
/// Send an RPC message to the libp2p service.
RPC(PeerId, RPCEvent),
/// Publish a message to gossipsub.
Publish {
topics: Vec<Topic>,
message: PubsubMessage,
},
}
/// Type of outgoing messages that can be sent through the network service.
#[derive(Debug)]
pub enum OutgoingMessage {
/// Send an RPC request/response.
RPC(RPCEvent),
/// Propagate a received gossipsub message
Propagate {
propagation_source: PeerId,
message_id: String,
},
}

View File

@@ -1,5 +1,5 @@
use super::manager::{ImportManager, ImportManagerOutcome};
use crate::service::{NetworkMessage, OutgoingMessage};
use crate::service::NetworkMessage;
use beacon_chain::{BeaconChain, BeaconChainTypes, BlockProcessingOutcome};
use eth2_libp2p::rpc::methods::*;
use eth2_libp2p::rpc::{RPCEvent, RPCRequest, RPCResponse, RequestId};
@@ -466,7 +466,7 @@ impl<T: BeaconChainTypes> SimpleSync<T> {
SHOULD_FORWARD_GOSSIP_BLOCK
}
BlockProcessingOutcome::BlockIsAlreadyKnown => SHOULD_FORWARD_GOSSIP_BLOCK,
_ => SHOULD_NOT_FORWARD_GOSSIP_BLOCK,
_ => SHOULD_NOT_FORWARD_GOSSIP_BLOCK, //TODO: Decide if we want to forward these
}
} else {
SHOULD_NOT_FORWARD_GOSSIP_BLOCK
@@ -558,12 +558,8 @@ impl NetworkContext {
}
fn send_rpc_event(&mut self, peer_id: PeerId, rpc_event: RPCEvent) {
self.send(peer_id, OutgoingMessage::RPC(rpc_event))
}
fn send(&mut self, peer_id: PeerId, outgoing_message: OutgoingMessage) {
self.network_send
.try_send(NetworkMessage::Send(peer_id, outgoing_message))
.try_send(NetworkMessage::RPC(peer_id, rpc_event))
.unwrap_or_else(|_| {
warn!(
self.log,