mirror of
https://github.com/sigp/lighthouse.git
synced 2026-03-11 18:04:18 +00:00
Add decoding/encoding for extended gossip topics. Correct logging CLI
This commit is contained in:
@@ -10,11 +10,13 @@ use eth2_libp2p::{
|
||||
};
|
||||
use futures::future::Future;
|
||||
use futures::stream::Stream;
|
||||
use slog::{debug, warn};
|
||||
use slog::{debug, trace, warn};
|
||||
use ssz::{Decode, DecodeError};
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::mpsc;
|
||||
use types::{Attestation, BeaconBlock, BeaconBlockHeader};
|
||||
use types::{
|
||||
Attestation, AttesterSlashing, BeaconBlock, BeaconBlockHeader, ProposerSlashing, VoluntaryExit,
|
||||
};
|
||||
|
||||
/// Handles messages received from the network and client and organises syncing.
|
||||
pub struct MessageHandler<T: BeaconChainTypes> {
|
||||
@@ -49,7 +51,7 @@ impl<T: BeaconChainTypes + 'static> MessageHandler<T> {
|
||||
executor: &tokio::runtime::TaskExecutor,
|
||||
log: slog::Logger,
|
||||
) -> error::Result<mpsc::UnboundedSender<HandlerMessage>> {
|
||||
debug!(log, "Service starting");
|
||||
trace!(log, "Service starting");
|
||||
|
||||
let (handler_send, handler_recv) = mpsc::unbounded_channel();
|
||||
|
||||
@@ -65,7 +67,6 @@ impl<T: BeaconChainTypes + 'static> MessageHandler<T> {
|
||||
};
|
||||
|
||||
// spawn handler task
|
||||
// TODO: Handle manual termination of thread
|
||||
executor.spawn(
|
||||
handler_recv
|
||||
.for_each(move |msg| Ok(handler.handle_message(msg)))
|
||||
@@ -221,43 +222,79 @@ impl<T: BeaconChainTypes + 'static> MessageHandler<T> {
|
||||
/// Handle various RPC errors
|
||||
fn handle_rpc_error(&mut self, peer_id: PeerId, request_id: RequestId, error: RPCError) {
|
||||
//TODO: Handle error correctly
|
||||
warn!(self.log, "RPC Error"; "Peer" => format!("{:?}", peer_id), "Request Id" => format!("{}", request_id), "Error" => format!("{:?}", error));
|
||||
warn!(self.log, "RPC Error"; "Peer" => format!("{:?}", peer_id), "request_id" => format!("{}", request_id), "Error" => format!("{:?}", error));
|
||||
}
|
||||
|
||||
/// Handle RPC messages
|
||||
fn handle_gossip(&mut self, peer_id: PeerId, gossip_message: PubsubMessage) {
|
||||
match gossip_message {
|
||||
PubsubMessage::Block(message) => match self.decode_gossip_block(message) {
|
||||
Err(e) => {
|
||||
debug!(self.log, "Invalid Gossiped Beacon Block"; "Peer" => format!("{}", peer_id), "Error" => format!("{:?}", e));
|
||||
}
|
||||
Ok(block) => {
|
||||
let _should_forward_on =
|
||||
self.sync
|
||||
.on_block_gossip(peer_id, block, &mut self.network_context);
|
||||
}
|
||||
Err(e) => {
|
||||
debug!(self.log, "Invalid gossiped beacon block"; "peer_id" => format!("{}", peer_id), "Error" => format!("{:?}", e));
|
||||
}
|
||||
},
|
||||
PubsubMessage::Attestation(message) => match self.decode_gossip_attestation(message) {
|
||||
Err(e) => {
|
||||
debug!(self.log, "Invalid Gossiped Attestation"; "Peer" => format!("{}", peer_id), "Error" => format!("{:?}", e));
|
||||
}
|
||||
Ok(attestation) => {
|
||||
self.sync
|
||||
.on_attestation_gossip(peer_id, attestation, &mut self.network_context)
|
||||
}
|
||||
Err(e) => {
|
||||
debug!(self.log, "Invalid gossiped attestation"; "peer_id" => format!("{}", peer_id), "Error" => format!("{:?}", e));
|
||||
}
|
||||
},
|
||||
PubsubMessage::VoluntaryExit(message) => match self.decode_gossip_exit(message) {
|
||||
Ok(_exit) => {
|
||||
// TODO: Handle exits
|
||||
debug!(self.log, "Received a voluntary exit"; "peer_id" => format!("{}", peer_id) );
|
||||
}
|
||||
Err(e) => {
|
||||
debug!(self.log, "Invalid gossiped exit"; "peer_id" => format!("{}", peer_id), "Error" => format!("{:?}", e));
|
||||
}
|
||||
},
|
||||
PubsubMessage::ProposerSlashing(message) => {
|
||||
match self.decode_gossip_proposer_slashing(message) {
|
||||
Ok(_slashing) => {
|
||||
// TODO: Handle proposer slashings
|
||||
debug!(self.log, "Received a proposer slashing"; "peer_id" => format!("{}", peer_id) );
|
||||
}
|
||||
Err(e) => {
|
||||
debug!(self.log, "Invalid gossiped proposer slashing"; "peer_id" => format!("{}", peer_id), "Error" => format!("{:?}", e));
|
||||
}
|
||||
}
|
||||
}
|
||||
PubsubMessage::AttesterSlashing(message) => {
|
||||
match self.decode_gossip_attestation_slashing(message) {
|
||||
Ok(_slashing) => {
|
||||
// TODO: Handle attester slashings
|
||||
debug!(self.log, "Received an attester slashing"; "peer_id" => format!("{}", peer_id) );
|
||||
}
|
||||
Err(e) => {
|
||||
debug!(self.log, "Invalid gossiped attester slashing"; "peer_id" => format!("{}", peer_id), "Error" => format!("{:?}", e));
|
||||
}
|
||||
}
|
||||
}
|
||||
PubsubMessage::Unknown(message) => {
|
||||
// Received a message from an unknown topic. Ignore for now
|
||||
debug!(self.log, "Unknown Gossip Message"; "Peer" => format!("{}", peer_id), "Message" => format!("{:?}", message));
|
||||
debug!(self.log, "Unknown Gossip Message"; "peer_id" => format!("{}", peer_id), "Message" => format!("{:?}", message));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/* Decoding of blocks and attestations from the network.
|
||||
/* Decoding of gossipsub objects from the network.
|
||||
*
|
||||
* The decoding is done in the message handler as it has access to to a `BeaconChain` and can
|
||||
* therefore apply more efficient logic in decoding and verification.
|
||||
*
|
||||
* TODO: Apply efficient decoding/verification of these objects
|
||||
*/
|
||||
|
||||
/* Gossipsub Domain Decoding */
|
||||
// Note: These are not generics as type-specific verification will need to be applied.
|
||||
fn decode_gossip_block(
|
||||
&self,
|
||||
beacon_block: Vec<u8>,
|
||||
@@ -274,6 +311,29 @@ impl<T: BeaconChainTypes + 'static> MessageHandler<T> {
|
||||
Attestation::from_ssz_bytes(&beacon_block)
|
||||
}
|
||||
|
||||
fn decode_gossip_exit(&self, voluntary_exit: Vec<u8>) -> Result<VoluntaryExit, DecodeError> {
|
||||
//TODO: Apply verification before decoding.
|
||||
VoluntaryExit::from_ssz_bytes(&voluntary_exit)
|
||||
}
|
||||
|
||||
fn decode_gossip_proposer_slashing(
|
||||
&self,
|
||||
proposer_slashing: Vec<u8>,
|
||||
) -> Result<ProposerSlashing, DecodeError> {
|
||||
//TODO: Apply verification before decoding.
|
||||
ProposerSlashing::from_ssz_bytes(&proposer_slashing)
|
||||
}
|
||||
|
||||
fn decode_gossip_attestation_slashing(
|
||||
&self,
|
||||
attester_slashing: Vec<u8>,
|
||||
) -> Result<AttesterSlashing<T::EthSpec>, DecodeError> {
|
||||
//TODO: Apply verification before decoding.
|
||||
AttesterSlashing::from_ssz_bytes(&attester_slashing)
|
||||
}
|
||||
|
||||
/* Req/Resp Domain Decoding */
|
||||
|
||||
/// Verifies and decodes the ssz-encoded block bodies received from peers.
|
||||
fn decode_block_bodies(
|
||||
&self,
|
||||
|
||||
Reference in New Issue
Block a user