Add remaining network ops to queuing system (#1546)

## Issue Addressed

NA

## Proposed Changes

- Refactors the `BeaconProcessor` to remove some excessive nesting and file bloat
  - Sorry about the noise from this, it's all contained in 4d3f8c5 though.
- Adds exits, proposer slashings, attester slashings to the `BeaconProcessor` so we don't get overwhelmed with large amounts of slashings (which happened a few hours ago).

## Additional Info

NA
This commit is contained in:
Paul Hauner
2020-08-19 05:09:53 +00:00
parent 33b2a3d0e0
commit 8e7dd7b2b1
5 changed files with 1054 additions and 774 deletions

View File

@@ -16,7 +16,7 @@ use eth2_libp2p::{
};
use futures::prelude::*;
use processor::Processor;
use slog::{debug, o, trace, warn};
use slog::{debug, o, trace};
use std::sync::Arc;
use tokio::sync::mpsc;
use types::EthSpec;
@@ -26,8 +26,6 @@ use types::EthSpec;
/// passing them to the internal message processor. The message processor spawns a syncing thread
/// which manages which blocks need to be requested and processed.
pub struct Router<T: BeaconChainTypes> {
/// A channel to the network service to allow for gossip propagation.
network_send: mpsc::UnboundedSender<NetworkMessage<T::EthSpec>>,
/// Access to the peer db for logging.
network_globals: Arc<NetworkGlobals<T::EthSpec>>,
/// Processes validated and decoded messages from the network. Has direct access to the
@@ -89,13 +87,12 @@ impl<T: BeaconChainTypes> Router<T> {
executor.clone(),
beacon_chain,
network_globals.clone(),
network_send.clone(),
network_send,
&log,
);
// generate the Message handler
let mut handler = Router {
network_send,
network_globals,
processor,
log: message_handler_log,
@@ -232,13 +229,7 @@ impl<T: BeaconChainTypes> Router<T> {
}
PubsubMessage::VoluntaryExit(exit) => {
debug!(self.log, "Received a voluntary exit"; "peer_id" => format!("{}", peer_id));
if let Some(verified_exit) = self
.processor
.verify_voluntary_exit_for_gossip(&peer_id, *exit)
{
self.propagate_message(id, peer_id);
self.processor.import_verified_voluntary_exit(verified_exit);
}
self.processor.on_voluntary_exit_gossip(id, peer_id, exit);
}
PubsubMessage::ProposerSlashing(proposer_slashing) => {
debug!(
@@ -246,14 +237,8 @@ impl<T: BeaconChainTypes> Router<T> {
"Received a proposer slashing";
"peer_id" => format!("{}", peer_id)
);
if let Some(verified_proposer_slashing) = self
.processor
.verify_proposer_slashing_for_gossip(&peer_id, *proposer_slashing)
{
self.propagate_message(id, peer_id);
self.processor
.import_verified_proposer_slashing(verified_proposer_slashing);
}
self.processor
.on_proposer_slashing_gossip(id, peer_id, proposer_slashing);
}
PubsubMessage::AttesterSlashing(attester_slashing) => {
debug!(
@@ -261,30 +246,9 @@ impl<T: BeaconChainTypes> Router<T> {
"Received a attester slashing";
"peer_id" => format!("{}", peer_id)
);
if let Some(verified_attester_slashing) = self
.processor
.verify_attester_slashing_for_gossip(&peer_id, *attester_slashing)
{
self.propagate_message(id, peer_id);
self.processor
.import_verified_attester_slashing(verified_attester_slashing);
}
self.processor
.on_attester_slashing_gossip(id, peer_id, attester_slashing);
}
}
}
/// Informs the network service that the message should be forwarded to other peers (is valid).
fn propagate_message(&mut self, message_id: MessageId, propagation_source: PeerId) {
self.network_send
.send(NetworkMessage::Validate {
propagation_source,
message_id,
})
.unwrap_or_else(|_| {
warn!(
self.log,
"Could not send propagation request to the network service"
)
});
}
}