Adds gossipsub object validation and verification

This commit is contained in:
Age Manning
2019-09-05 02:06:39 +10:00
parent 192380cb58
commit e7ab89a783
8 changed files with 106 additions and 47 deletions

View File

@@ -21,6 +21,8 @@ pub struct MessageHandler<T: BeaconChainTypes> {
_chain: Arc<BeaconChain<T>>,
/// The syncing framework.
sync: SimpleSync<T>,
/// A channel to the network service to allow for gossip propagation.
network_send: mpsc::UnboundedSender<NetworkMessage>,
/// The `MessageHandler` logger.
log: slog::Logger,
}
@@ -34,8 +36,9 @@ pub enum HandlerMessage {
PeerDisconnected(PeerId),
/// An RPC response/request has been received.
RPC(PeerId, RPCEvent),
/// A gossip message has been received.
PubsubMessage(PeerId, PubsubMessage),
/// A gossip message has been received. The fields are: message id, the peer that sent us this
/// message and the message itself.
PubsubMessage(String, PeerId, PubsubMessage),
}
impl<T: BeaconChainTypes + 'static> MessageHandler<T> {
@@ -50,12 +53,13 @@ impl<T: BeaconChainTypes + 'static> MessageHandler<T> {
let (handler_send, handler_recv) = mpsc::unbounded_channel();
// Initialise sync and begin processing in thread
let sync = SimpleSync::new(beacon_chain.clone(), network_send, &log);
let sync = SimpleSync::new(beacon_chain.clone(), network_send.clone(), &log);
// generate the Message handler
let mut handler = MessageHandler {
_chain: beacon_chain.clone(),
sync,
network_send,
log: log.clone(),
};
@@ -87,8 +91,8 @@ impl<T: BeaconChainTypes + 'static> MessageHandler<T> {
self.handle_rpc_message(peer_id, rpc_event);
}
// An RPC message request/response has been received
HandlerMessage::PubsubMessage(peer_id, gossip) => {
self.handle_gossip(peer_id, gossip);
HandlerMessage::PubsubMessage(id, peer_id, gossip) => {
self.handle_gossip(id, peer_id, gossip);
}
}
}
@@ -194,24 +198,34 @@ impl<T: BeaconChainTypes + 'static> MessageHandler<T> {
}
/// Handle RPC messages
fn handle_gossip(&mut self, peer_id: PeerId, gossip_message: PubsubMessage) {
fn handle_gossip(&mut self, id: String, peer_id: PeerId, gossip_message: PubsubMessage) {
match gossip_message {
PubsubMessage::Block(message) => match self.decode_gossip_block(message) {
Ok(block) => {
let _should_forward_on = self.sync.on_block_gossip(peer_id, block);
let should_forward_on = self.sync.on_block_gossip(peer_id.clone(), block);
// TODO: Apply more sophisticated validation and decoding logic
if should_forward_on {
self.propagate_message(id, peer_id.clone());
}
}
Err(e) => {
debug!(self.log, "Invalid gossiped beacon block"; "peer_id" => format!("{}", peer_id), "Error" => format!("{:?}", e));
}
},
PubsubMessage::Attestation(message) => match self.decode_gossip_attestation(message) {
Ok(attestation) => self.sync.on_attestation_gossip(peer_id, attestation),
Ok(attestation) => {
// TODO: Apply more sophisticated validation and decoding logic
self.propagate_message(id, peer_id.clone());
self.sync.on_attestation_gossip(peer_id, attestation);
}
Err(e) => {
debug!(self.log, "Invalid gossiped attestation"; "peer_id" => format!("{}", peer_id), "Error" => format!("{:?}", e));
}
},
PubsubMessage::VoluntaryExit(message) => match self.decode_gossip_exit(message) {
Ok(_exit) => {
// TODO: Apply more sophisticated validation and decoding logic
self.propagate_message(id, peer_id.clone());
// TODO: Handle exits
debug!(self.log, "Received a voluntary exit"; "peer_id" => format!("{}", peer_id) );
}
@@ -222,6 +236,8 @@ impl<T: BeaconChainTypes + 'static> MessageHandler<T> {
PubsubMessage::ProposerSlashing(message) => {
match self.decode_gossip_proposer_slashing(message) {
Ok(_slashing) => {
// TODO: Apply more sophisticated validation and decoding logic
self.propagate_message(id, peer_id.clone());
// TODO: Handle proposer slashings
debug!(self.log, "Received a proposer slashing"; "peer_id" => format!("{}", peer_id) );
}
@@ -233,6 +249,8 @@ impl<T: BeaconChainTypes + 'static> MessageHandler<T> {
PubsubMessage::AttesterSlashing(message) => {
match self.decode_gossip_attestation_slashing(message) {
Ok(_slashing) => {
// TODO: Apply more sophisticated validation and decoding logic
self.propagate_message(id, peer_id.clone());
// TODO: Handle attester slashings
debug!(self.log, "Received an attester slashing"; "peer_id" => format!("{}", peer_id) );
}
@@ -248,6 +266,21 @@ impl<T: BeaconChainTypes + 'static> MessageHandler<T> {
}
}
/// Informs the network service that the message should be forwarded to other peers.
fn propagate_message(&mut self, message_id: String, propagation_source: PeerId) {
self.network_send
.try_send(NetworkMessage::Propagate {
propagation_source,
message_id,
})
.unwrap_or_else(|_| {
warn!(
self.log,
"Could not send propagation request to the network service"
)
});
}
/* Decoding of gossipsub objects from the network.
*
* The decoding is done in the message handler as it has access to to a `BeaconChain` and can