diff --git a/beacon_node/eth2-libp2p/src/behaviour.rs b/beacon_node/eth2-libp2p/src/behaviour.rs index 0229d06d50..41b7c89657 100644 --- a/beacon_node/eth2-libp2p/src/behaviour.rs +++ b/beacon_node/eth2-libp2p/src/behaviour.rs @@ -170,9 +170,11 @@ impl Behaviour { } /// Publishes a message on the pubsub (gossipsub) behaviour. - pub fn publish(&mut self, topic: Topic, message: PubsubMessage) { + pub fn publish(&mut self, topics: Vec, message: PubsubMessage) { let message_bytes = ssz_encode(&message); - self.gossipsub.publish(topic, message_bytes); + for topic in topics { + self.gossipsub.publish(topic, message_bytes.clone()); + } } } @@ -189,23 +191,13 @@ pub enum BehaviourEvent { }, } -/// Gossipsub message providing notification of a new block. -#[derive(Encode, Decode, Clone, Debug, PartialEq)] -pub struct BlockGossip { - pub root: BlockRootSlot, -} - -/// Gossipsub message providing notification of a new attestation. -#[derive(Encode, Decode, Clone, Debug, PartialEq)] -pub struct AttestationGossip { - pub attestation: Attestation, -} - /// Messages that are passed to and from the pubsub (Gossipsub) behaviour. #[derive(Debug, Clone)] pub enum PubsubMessage { - Block(BlockGossip), - Attestation(AttestationGossip), + /// Gossipsub message providing notification of a new block. + Block(BlockRootSlot), + /// Gossipsub message providing notification of a new attestation. + Attestation(Attestation), } //TODO: Correctly encode/decode enums. Prefixing with integer for now. @@ -229,11 +221,11 @@ impl Decodable for PubsubMessage { let (id, index) = u32::ssz_decode(bytes, index)?; match id { 1 => { - let (block, index) = BlockGossip::ssz_decode(bytes, index)?; + let (block, index) = BlockRootSlot::ssz_decode(bytes, index)?; Ok((PubsubMessage::Block(block), index)) } 2 => { - let (attestation, index) = AttestationGossip::ssz_decode(bytes, index)?; + let (attestation, index) = Attestation::ssz_decode(bytes, index)?; Ok((PubsubMessage::Attestation(attestation), index)) } _ => Err(DecodeError::Invalid), diff --git a/beacon_node/network/src/lib.rs b/beacon_node/network/src/lib.rs index 87f8368a50..c298e31b4e 100644 --- a/beacon_node/network/src/lib.rs +++ b/beacon_node/network/src/lib.rs @@ -6,4 +6,5 @@ pub mod service; pub mod sync; pub use eth2_libp2p::NetworkConfig; +pub use service::NetworkMessage; pub use service::Service; diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index 55c43e4ec6..b2d2b5a246 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -165,9 +165,9 @@ fn network_service( } }; } - Ok(NetworkMessage::Publish(topic, message)) => { - debug!(log, "Sending pubsub message on topic {:?}", topic); - libp2p_service.swarm.publish(topic, message); + Ok(NetworkMessage::Publish { topics, message }) => { + debug!(log, "Sending pubsub message on topics {:?}", topics); + libp2p_service.swarm.publish(topics, message); } Err(TryRecvError::Empty) => break, Err(TryRecvError::Disconnected) => { @@ -188,7 +188,10 @@ pub enum NetworkMessage { //TODO: Define typing for messages across the wire Send(PeerId, OutgoingMessage), /// Publish a message to pubsub mechanism. - Publish(Topic, PubsubMessage), + Publish { + topics: Vec, + message: PubsubMessage, + }, } /// Type of outgoing messages that can be sent through the network service. diff --git a/beacon_node/network/src/sync/simple_sync.rs b/beacon_node/network/src/sync/simple_sync.rs index 4c08a68710..2aa0a1d7dd 100644 --- a/beacon_node/network/src/sync/simple_sync.rs +++ b/beacon_node/network/src/sync/simple_sync.rs @@ -1,7 +1,6 @@ use super::import_queue::ImportQueue; use crate::beacon_chain::BeaconChain; use crate::message_handler::NetworkContext; -use eth2_libp2p::behaviour::{AttestationGossip, BlockGossip}; use eth2_libp2p::rpc::methods::*; use eth2_libp2p::rpc::{RPCRequest, RPCResponse, RequestId}; use eth2_libp2p::PeerId; @@ -9,7 +8,7 @@ use slog::{debug, error, info, o, warn}; use std::collections::HashMap; use std::sync::Arc; use std::time::Duration; -use types::{Epoch, Hash256, Slot}; +use types::{Attestation, Epoch, Hash256, Slot}; /// The number of slots that we can import blocks ahead of us, before going into full Sync mode. const SLOT_IMPORT_TOLERANCE: u64 = 100; @@ -521,12 +520,12 @@ impl SimpleSync { pub fn on_block_gossip( &mut self, peer_id: PeerId, - msg: BlockGossip, + msg: BlockRootSlot, network: &mut NetworkContext, ) { debug!( self.log, - "BlockGossip"; + "BlockSlot"; "peer" => format!("{:?}", peer_id), ); // TODO: filter out messages that a prior to the finalized slot. @@ -535,12 +534,12 @@ impl SimpleSync { // now. // // Note: only requests the new block -- will fail if we don't have its parents. - if self.import_queue.is_new_block(&msg.root.block_root) { + if self.import_queue.is_new_block(&msg.block_root) { self.request_block_headers( peer_id, BeaconBlockHeadersRequest { - start_root: msg.root.block_root, - start_slot: msg.root.slot, + start_root: msg.block_root, + start_slot: msg.slot, max_headers: 1, skip_slots: 0, }, @@ -555,19 +554,19 @@ impl SimpleSync { pub fn on_attestation_gossip( &mut self, peer_id: PeerId, - msg: AttestationGossip, + msg: Attestation, _network: &mut NetworkContext, ) { debug!( self.log, - "AttestationGossip"; + "Attestation"; "peer" => format!("{:?}", peer_id), ); // Awaiting a proper operations pool before we can import attestations. // // https://github.com/sigp/lighthouse/issues/281 - match self.chain.process_attestation(msg.attestation) { + match self.chain.process_attestation(msg) { Ok(_) => panic!("Impossible, method not implemented."), Err(_) => error!(self.log, "Attestation processing not implemented!"), } diff --git a/beacon_node/rpc/Cargo.toml b/beacon_node/rpc/Cargo.toml index e9709c1ced..3fc52c6b16 100644 --- a/beacon_node/rpc/Cargo.toml +++ b/beacon_node/rpc/Cargo.toml @@ -8,6 +8,7 @@ edition = "2018" bls = { path = "../../eth2/utils/bls" } beacon_chain = { path = "../beacon_chain" } network = { path = "../network" } +eth2-libp2p = { path = "../eth2-libp2p" } version = { path = "../version" } types = { path = "../../eth2/types" } ssz = { path = "../../eth2/utils/ssz" } diff --git a/beacon_node/rpc/src/beacon_block.rs b/beacon_node/rpc/src/beacon_block.rs index d124152f13..4e1875665b 100644 --- a/beacon_node/rpc/src/beacon_block.rs +++ b/beacon_node/rpc/src/beacon_block.rs @@ -1,17 +1,20 @@ +use crossbeam_channel; +use eth2_libp2p::rpc::methods::BlockRootSlot; +use eth2_libp2p::PubsubMessage; use futures::Future; use grpcio::{RpcContext, UnarySink}; +use network::NetworkMessage; use protos::services::{ BeaconBlock as BeaconBlockProto, ProduceBeaconBlockRequest, ProduceBeaconBlockResponse, PublishBeaconBlockRequest, PublishBeaconBlockResponse, }; use protos::services_grpc::BeaconBlockService; use slog::Logger; -use crossbeam_channel; -use network::NetworkMessage; +use types::{Hash256, Slot}; #[derive(Clone)] pub struct BeaconBlockServiceInstance { - network_chan: crossbeam_channel::Sender, + pub network_chan: crossbeam_channel::Sender, pub log: Logger, } @@ -47,14 +50,21 @@ impl BeaconBlockService for BeaconBlockServiceInstance { sink: UnarySink, ) { let block = req.get_block(); - println!("publishing {:?}", block); + let block_root = Hash256::from_slice(block.get_block_root()); + let block_slot = BlockRootSlot { + block_root, + slot: Slot::from(block.get_slot()), + }; + println!("publishing block with root {:?}", block_root); - - // TODO: Obtain from the network properly. - let topic = types::TopicBuilder::from("beacon_chain").build(); + // TODO: Obtain topics from the network service properly. + let topic = types::TopicBuilder::new("beacon_chain".to_string()).build(); + let message = PubsubMessage::Block(block_slot); println!("Sending beacon block to gossipsub"); - network_chan.send(NetworkMessage::Publish( - + self.network_chan.send(NetworkMessage::Publish { + topics: vec![topic], + message, + }); // TODO: actually process the block. let mut resp = PublishBeaconBlockResponse::new(); diff --git a/beacon_node/rpc/src/lib.rs b/beacon_node/rpc/src/lib.rs index e1267270ca..4dfd334879 100644 --- a/beacon_node/rpc/src/lib.rs +++ b/beacon_node/rpc/src/lib.rs @@ -11,6 +11,7 @@ use self::validator::ValidatorServiceInstance; pub use config::Config as RPCConfig; use futures::{future, Future}; use grpcio::{Environment, Server, ServerBuilder}; +use network::NetworkMessage; use protos::services_grpc::{ create_beacon_block_service, create_beacon_node_service, create_validator_service, }; @@ -42,8 +43,9 @@ pub fn start_server( let beacon_block_service = { let instance = BeaconBlockServiceInstance { - network_chan - log: log.clone() }; + network_chan, + log: log.clone(), + }; create_beacon_block_service(instance) }; let validator_service = {