Updates to latest interop branch.

- Shifts decoding of objects into message handler.
- Updates to latest interop gossipsub.
- Adds interop spec constant.
This commit is contained in:
Age Manning
2019-08-06 17:54:38 +10:00
parent 0613bc16fc
commit 107bbdcccd
16 changed files with 199 additions and 157 deletions

View File

@@ -14,7 +14,7 @@ use slog::{debug, warn};
use ssz::{Decode, DecodeError};
use std::sync::Arc;
use tokio::sync::mpsc;
use types::{BeaconBlockHeader, EthSpec};
use types::{Attestation, BeaconBlock, BeaconBlockHeader};
/// Handles messages received from the network and client and organises syncing.
pub struct MessageHandler<T: BeaconChainTypes> {
@@ -23,14 +23,14 @@ pub struct MessageHandler<T: BeaconChainTypes> {
/// The syncing framework.
sync: SimpleSync<T>,
/// The context required to send messages to, and process messages from peers.
network_context: NetworkContext<T::EthSpec>,
network_context: NetworkContext,
/// The `MessageHandler` logger.
log: slog::Logger,
}
/// Types of messages the handler can receive.
#[derive(Debug)]
pub enum HandlerMessage<E: EthSpec> {
pub enum HandlerMessage {
/// We have initiated a connection to a new peer.
PeerDialed(PeerId),
/// Peer has disconnected,
@@ -38,17 +38,17 @@ pub enum HandlerMessage<E: EthSpec> {
/// An RPC response/request has been received.
RPC(PeerId, RPCEvent),
/// A gossip message has been received.
PubsubMessage(PeerId, Box<PubsubMessage<E>>),
PubsubMessage(PeerId, PubsubMessage),
}
impl<T: BeaconChainTypes + 'static> MessageHandler<T> {
/// Initializes and runs the MessageHandler.
pub fn spawn(
beacon_chain: Arc<BeaconChain<T>>,
network_send: mpsc::UnboundedSender<NetworkMessage<T::EthSpec>>,
network_send: mpsc::UnboundedSender<NetworkMessage>,
executor: &tokio::runtime::TaskExecutor,
log: slog::Logger,
) -> error::Result<mpsc::UnboundedSender<HandlerMessage<T::EthSpec>>> {
) -> error::Result<mpsc::UnboundedSender<HandlerMessage>> {
debug!(log, "Service starting");
let (handler_send, handler_recv) = mpsc::unbounded_channel();
@@ -78,7 +78,7 @@ impl<T: BeaconChainTypes + 'static> MessageHandler<T> {
}
/// Handle all messages incoming from the network service.
fn handle_message(&mut self, message: HandlerMessage<T::EthSpec>) {
fn handle_message(&mut self, message: HandlerMessage) {
match message {
// we have initiated a connection to a peer
HandlerMessage::PeerDialed(peer_id) => {
@@ -94,7 +94,7 @@ impl<T: BeaconChainTypes + 'static> MessageHandler<T> {
}
// we have received an RPC message request/response
HandlerMessage::PubsubMessage(peer_id, gossip) => {
self.handle_gossip(peer_id, *gossip);
self.handle_gossip(peer_id, gossip);
}
}
}
@@ -218,6 +218,62 @@ impl<T: BeaconChainTypes + 'static> MessageHandler<T> {
}
}
/// Handle various RPC errors
fn handle_rpc_error(&mut self, peer_id: PeerId, request_id: RequestId, error: RPCError) {
//TODO: Handle error correctly
warn!(self.log, "RPC Error"; "Peer" => format!("{:?}", peer_id), "Request Id" => format!("{}", request_id), "Error" => format!("{:?}", error));
}
/// Handle RPC messages
fn handle_gossip(&mut self, peer_id: PeerId, gossip_message: PubsubMessage) {
match gossip_message {
PubsubMessage::Block(message) => match self.decode_gossip_block(message) {
Err(e) => {
debug!(self.log, "Invalid Gossiped Beacon Block"; "Peer" => format!("{}", peer_id), "Error" => format!("{:?}", e));
}
Ok(block) => {
let _should_forward_on =
self.sync
.on_block_gossip(peer_id, block, &mut self.network_context);
}
},
PubsubMessage::Attestation(message) => match self.decode_gossip_attestation(message) {
Err(e) => {
debug!(self.log, "Invalid Gossiped Attestation"; "Peer" => format!("{}", peer_id), "Error" => format!("{:?}", e));
}
Ok(attestation) => {
self.sync
.on_attestation_gossip(peer_id, attestation, &mut self.network_context)
}
},
PubsubMessage::Unknown(message) => {
// Received a message from an unknown topic. Ignore for now
debug!(self.log, "Unknown Gossip Message"; "Peer" => format!("{}", peer_id), "Message" => format!("{:?}", message));
}
}
}
/* Decoding of blocks and attestations from the network.
*
* TODO: Apply efficient decoding/verification of these objects
*/
fn decode_gossip_block(
&self,
beacon_block: Vec<u8>,
) -> Result<BeaconBlock<T::EthSpec>, DecodeError> {
//TODO: Apply verification before decoding.
BeaconBlock::from_ssz_bytes(&beacon_block)
}
fn decode_gossip_attestation(
&self,
beacon_block: Vec<u8>,
) -> Result<Attestation<T::EthSpec>, DecodeError> {
//TODO: Apply verification before decoding.
Attestation::from_ssz_bytes(&beacon_block)
}
/// Verifies and decodes the ssz-encoded block bodies received from peers.
fn decode_block_bodies(
&self,
@@ -241,39 +297,18 @@ impl<T: BeaconChainTypes + 'static> MessageHandler<T> {
//TODO: Implement faster header verification before decoding entirely
Vec::from_ssz_bytes(&headers_response.headers)
}
/// Handle various RPC errors
fn handle_rpc_error(&mut self, peer_id: PeerId, request_id: RequestId, error: RPCError) {
//TODO: Handle error correctly
warn!(self.log, "RPC Error"; "Peer" => format!("{:?}", peer_id), "Request Id" => format!("{}", request_id), "Error" => format!("{:?}", error));
}
/// Handle RPC messages
fn handle_gossip(&mut self, peer_id: PeerId, gossip_message: PubsubMessage<T::EthSpec>) {
match gossip_message {
PubsubMessage::Block(message) => {
let _should_forward_on =
self.sync
.on_block_gossip(peer_id, message, &mut self.network_context);
}
PubsubMessage::Attestation(message) => {
self.sync
.on_attestation_gossip(peer_id, message, &mut self.network_context)
}
}
}
}
// TODO: RPC Rewrite makes this struct fairly pointless
pub struct NetworkContext<E: EthSpec> {
pub struct NetworkContext {
/// The network channel to relay messages to the Network service.
network_send: mpsc::UnboundedSender<NetworkMessage<E>>,
network_send: mpsc::UnboundedSender<NetworkMessage>,
/// The `MessageHandler` logger.
log: slog::Logger,
}
impl<E: EthSpec> NetworkContext<E> {
pub fn new(network_send: mpsc::UnboundedSender<NetworkMessage<E>>, log: slog::Logger) -> Self {
impl NetworkContext {
pub fn new(network_send: mpsc::UnboundedSender<NetworkMessage>, log: slog::Logger) -> Self {
Self { network_send, log }
}

View File

@@ -14,13 +14,12 @@ use slog::{debug, info, o, trace};
use std::sync::Arc;
use tokio::runtime::TaskExecutor;
use tokio::sync::{mpsc, oneshot};
use types::EthSpec;
/// Service that handles communication between internal services and the eth2_libp2p network service.
pub struct Service<T: BeaconChainTypes> {
libp2p_service: Arc<Mutex<LibP2PService<T::EthSpec>>>,
libp2p_service: Arc<Mutex<LibP2PService>>,
_libp2p_exit: oneshot::Sender<()>,
_network_send: mpsc::UnboundedSender<NetworkMessage<T::EthSpec>>,
_network_send: mpsc::UnboundedSender<NetworkMessage>,
_phantom: PhantomData<T>, //message_handler: MessageHandler,
//message_handler_send: Sender<HandlerMessage>
}
@@ -31,9 +30,9 @@ impl<T: BeaconChainTypes + 'static> Service<T> {
config: &NetworkConfig,
executor: &TaskExecutor,
log: slog::Logger,
) -> error::Result<(Arc<Self>, mpsc::UnboundedSender<NetworkMessage<T::EthSpec>>)> {
) -> error::Result<(Arc<Self>, mpsc::UnboundedSender<NetworkMessage>)> {
// build the network channel
let (network_send, network_recv) = mpsc::unbounded_channel::<NetworkMessage<_>>();
let (network_send, network_recv) = mpsc::unbounded_channel::<NetworkMessage>();
// launch message handler thread
let message_handler_log = log.new(o!("Service" => "MessageHandler"));
let message_handler_send = MessageHandler::spawn(
@@ -65,15 +64,15 @@ impl<T: BeaconChainTypes + 'static> Service<T> {
Ok((Arc::new(network_service), network_send))
}
pub fn libp2p_service(&self) -> Arc<Mutex<LibP2PService<T::EthSpec>>> {
pub fn libp2p_service(&self) -> Arc<Mutex<LibP2PService>> {
self.libp2p_service.clone()
}
}
fn spawn_service<E: EthSpec>(
libp2p_service: Arc<Mutex<LibP2PService<E>>>,
network_recv: mpsc::UnboundedReceiver<NetworkMessage<E>>,
message_handler_send: mpsc::UnboundedSender<HandlerMessage<E>>,
fn spawn_service(
libp2p_service: Arc<Mutex<LibP2PService>>,
network_recv: mpsc::UnboundedReceiver<NetworkMessage>,
message_handler_send: mpsc::UnboundedSender<HandlerMessage>,
executor: &TaskExecutor,
log: slog::Logger,
) -> error::Result<tokio::sync::oneshot::Sender<()>> {
@@ -99,10 +98,10 @@ fn spawn_service<E: EthSpec>(
}
//TODO: Potentially handle channel errors
fn network_service<E: EthSpec>(
libp2p_service: Arc<Mutex<LibP2PService<E>>>,
mut network_recv: mpsc::UnboundedReceiver<NetworkMessage<E>>,
mut message_handler_send: mpsc::UnboundedSender<HandlerMessage<E>>,
fn network_service(
libp2p_service: Arc<Mutex<LibP2PService>>,
mut network_recv: mpsc::UnboundedReceiver<NetworkMessage>,
mut message_handler_send: mpsc::UnboundedSender<HandlerMessage>,
log: slog::Logger,
) -> impl futures::Future<Item = (), Error = eth2_libp2p::error::Error> {
futures::future::poll_fn(move || -> Result<_, eth2_libp2p::error::Error> {
@@ -119,7 +118,7 @@ fn network_service<E: EthSpec>(
},
NetworkMessage::Publish { topics, message } => {
debug!(log, "Sending pubsub message"; "topics" => format!("{:?}",topics));
libp2p_service.lock().swarm.publish(topics, *message);
libp2p_service.lock().swarm.publish(topics, message);
}
},
Ok(Async::NotReady) => break,
@@ -176,14 +175,14 @@ fn network_service<E: EthSpec>(
/// Types of messages that the network service can receive.
#[derive(Debug)]
pub enum NetworkMessage<E: EthSpec> {
pub enum NetworkMessage {
/// Send a message to libp2p service.
//TODO: Define typing for messages across the wire
Send(PeerId, OutgoingMessage),
/// Publish a message to pubsub mechanism.
Publish {
topics: Vec<Topic>,
message: Box<PubsubMessage<E>>,
message: PubsubMessage,
},
}

View File

@@ -123,7 +123,7 @@ impl<T: BeaconChainTypes> SimpleSync<T> {
/// Handle the connection of a new peer.
///
/// Sends a `Hello` message to the peer.
pub fn on_connect(&self, peer_id: PeerId, network: &mut NetworkContext<T::EthSpec>) {
pub fn on_connect(&self, peer_id: PeerId, network: &mut NetworkContext) {
info!(self.log, "PeerConnected"; "peer" => format!("{:?}", peer_id));
network.send_rpc_request(peer_id, RPCRequest::Hello(hello_message(&self.chain)));
@@ -137,7 +137,7 @@ impl<T: BeaconChainTypes> SimpleSync<T> {
peer_id: PeerId,
request_id: RequestId,
hello: HelloMessage,
network: &mut NetworkContext<T::EthSpec>,
network: &mut NetworkContext,
) {
debug!(self.log, "HelloRequest"; "peer" => format!("{:?}", peer_id));
@@ -156,7 +156,7 @@ impl<T: BeaconChainTypes> SimpleSync<T> {
&mut self,
peer_id: PeerId,
hello: HelloMessage,
network: &mut NetworkContext<T::EthSpec>,
network: &mut NetworkContext,
) {
debug!(self.log, "HelloResponse"; "peer" => format!("{:?}", peer_id));
@@ -171,7 +171,7 @@ impl<T: BeaconChainTypes> SimpleSync<T> {
&mut self,
peer_id: PeerId,
hello: HelloMessage,
network: &mut NetworkContext<T::EthSpec>,
network: &mut NetworkContext,
) {
let remote = PeerSyncInfo::from(hello);
let local = PeerSyncInfo::from(&self.chain);
@@ -278,7 +278,7 @@ impl<T: BeaconChainTypes> SimpleSync<T> {
peer_id: PeerId,
request_id: RequestId,
req: BeaconBlockRootsRequest,
network: &mut NetworkContext<T::EthSpec>,
network: &mut NetworkContext,
) {
debug!(
self.log,
@@ -323,7 +323,7 @@ impl<T: BeaconChainTypes> SimpleSync<T> {
&mut self,
peer_id: PeerId,
res: BeaconBlockRootsResponse,
network: &mut NetworkContext<T::EthSpec>,
network: &mut NetworkContext,
) {
debug!(
self.log,
@@ -387,7 +387,7 @@ impl<T: BeaconChainTypes> SimpleSync<T> {
peer_id: PeerId,
request_id: RequestId,
req: BeaconBlockHeadersRequest,
network: &mut NetworkContext<T::EthSpec>,
network: &mut NetworkContext,
) {
debug!(
self.log,
@@ -440,7 +440,7 @@ impl<T: BeaconChainTypes> SimpleSync<T> {
&mut self,
peer_id: PeerId,
headers: Vec<BeaconBlockHeader>,
network: &mut NetworkContext<T::EthSpec>,
network: &mut NetworkContext,
) {
debug!(
self.log,
@@ -472,7 +472,7 @@ impl<T: BeaconChainTypes> SimpleSync<T> {
peer_id: PeerId,
request_id: RequestId,
req: BeaconBlockBodiesRequest,
network: &mut NetworkContext<T::EthSpec>,
network: &mut NetworkContext,
) {
let block_bodies: Vec<BeaconBlockBody<_>> = req
.block_roots
@@ -518,7 +518,7 @@ impl<T: BeaconChainTypes> SimpleSync<T> {
&mut self,
peer_id: PeerId,
res: DecodedBeaconBlockBodiesResponse<T::EthSpec>,
network: &mut NetworkContext<T::EthSpec>,
network: &mut NetworkContext,
) {
debug!(
self.log,
@@ -557,7 +557,7 @@ impl<T: BeaconChainTypes> SimpleSync<T> {
&mut self,
peer_id: PeerId,
block: BeaconBlock<T::EthSpec>,
network: &mut NetworkContext<T::EthSpec>,
network: &mut NetworkContext,
) -> bool {
if let Some(outcome) =
self.process_block(peer_id.clone(), block.clone(), network, &"gossip")
@@ -627,7 +627,7 @@ impl<T: BeaconChainTypes> SimpleSync<T> {
&mut self,
_peer_id: PeerId,
msg: Attestation<T::EthSpec>,
_network: &mut NetworkContext<T::EthSpec>,
_network: &mut NetworkContext,
) {
match self.chain.process_attestation(msg) {
Ok(()) => info!(self.log, "ImportedAttestation"; "source" => "gossip"),
@@ -642,7 +642,7 @@ impl<T: BeaconChainTypes> SimpleSync<T> {
&mut self,
peer_id: PeerId,
req: BeaconBlockRootsRequest,
network: &mut NetworkContext<T::EthSpec>,
network: &mut NetworkContext,
) {
// Potentially set state to sync.
if self.state == SyncState::Idle && req.count > SLOT_IMPORT_TOLERANCE {
@@ -666,7 +666,7 @@ impl<T: BeaconChainTypes> SimpleSync<T> {
&mut self,
peer_id: PeerId,
req: BeaconBlockHeadersRequest,
network: &mut NetworkContext<T::EthSpec>,
network: &mut NetworkContext,
) {
debug!(
self.log,
@@ -683,7 +683,7 @@ impl<T: BeaconChainTypes> SimpleSync<T> {
&mut self,
peer_id: PeerId,
req: BeaconBlockBodiesRequest,
network: &mut NetworkContext<T::EthSpec>,
network: &mut NetworkContext,
) {
debug!(
self.log,
@@ -719,7 +719,7 @@ impl<T: BeaconChainTypes> SimpleSync<T> {
&mut self,
peer_id: PeerId,
block_root: Hash256,
network: &mut NetworkContext<T::EthSpec>,
network: &mut NetworkContext,
source: &str,
) -> Option<BlockProcessingOutcome> {
match self.import_queue.attempt_complete_block(block_root) {
@@ -812,7 +812,7 @@ impl<T: BeaconChainTypes> SimpleSync<T> {
&mut self,
peer_id: PeerId,
block: BeaconBlock<T::EthSpec>,
network: &mut NetworkContext<T::EthSpec>,
network: &mut NetworkContext,
source: &str,
) -> Option<BlockProcessingOutcome> {
let processing_result = self.chain.process_block(block.clone());
@@ -917,8 +917,8 @@ fn hello_message<T: BeaconChainTypes>(beacon_chain: &BeaconChain<T>) -> HelloMes
network_id: spec.network_id,
//TODO: Correctly define the chain id
chain_id: spec.network_id as u64,
latest_finalized_root: state.finalized_root,
latest_finalized_epoch: state.finalized_epoch,
latest_finalized_root: state.finalized_checkpoint.root,
latest_finalized_epoch: state.finalized_checkpoint.epoch,
best_root: beacon_chain.head().beacon_block_root,
best_slot: state.slot,
}