Merge branch 'paul-gossip-test' of github.com:sigp/lighthouse into paul-gossip-test

This commit is contained in:
Paul Hauner
2019-03-26 09:47:38 +11:00
39 changed files with 1283 additions and 330 deletions

View File

@@ -456,8 +456,8 @@ where
}
/// Produce an `AttestationData` that is valid for the present `slot` and given `shard`.
pub fn produce_attestation_data(&self, shard: u64) -> Result<AttestationData, Error> {
trace!("BeaconChain::produce_attestation_data: shard: {}", shard);
pub fn produce_attestation(&self, shard: u64) -> Result<AttestationData, Error> {
trace!("BeaconChain::produce_attestation: shard: {}", shard);
let source_epoch = self.state.read().current_justified_epoch;
let source_root = *self.state.read().get_block_root(
source_epoch.start_slot(self.spec.slots_per_epoch),

View File

@@ -50,18 +50,18 @@ impl<T: ClientDB, U: SlotClock, F: ForkChoice> DirectBeaconNode<T, U, F> {
}
impl<T: ClientDB, U: SlotClock, F: ForkChoice> AttesterBeaconNode for DirectBeaconNode<T, U, F> {
fn produce_attestation_data(
fn produce_attestation(
&self,
_slot: Slot,
shard: u64,
) -> Result<Option<AttestationData>, NodeError> {
match self.beacon_chain.produce_attestation_data(shard) {
match self.beacon_chain.produce_attestation(shard) {
Ok(attestation_data) => Ok(Some(attestation_data)),
Err(e) => Err(NodeError::RemoteFailure(format!("{:?}", e))),
}
}
fn publish_attestation_data(
fn publish_attestation(
&self,
free_attestation: FreeAttestation,
) -> Result<AttestationPublishOutcome, NodeError> {

View File

@@ -24,12 +24,8 @@ pub struct Client<T: ClientTypes> {
beacon_chain: Arc<BeaconChain<T::DB, T::SlotClock, T::ForkChoice>>,
/// Reference to the network service.
pub network: Arc<NetworkService>,
/// Future to stop and begin shutdown of the Client.
//TODO: Decide best way to handle shutdown
pub exit: exit_future::Exit,
/// The sending future to call to terminate the Client.
//TODO: Decide best way to handle shutdown
pub exit_signal: Signal,
/// Signal to terminate the RPC server.
pub rpc_exit_signal: Option<Signal>,
/// The clients logger.
log: slog::Logger,
/// Marker to pin the beacon chain generics.
@@ -43,8 +39,6 @@ impl<TClientType: ClientTypes> Client<TClientType> {
log: slog::Logger,
executor: &TaskExecutor,
) -> error::Result<Self> {
let (exit_signal, exit) = exit_future::signal();
// generate a beacon chain
let beacon_chain = TClientType::initialise_beacon_chain(&config);
@@ -52,23 +46,29 @@ impl<TClientType: ClientTypes> Client<TClientType> {
// TODO: Add beacon_chain reference to network parameters
let network_config = &config.net_conf;
let network_logger = log.new(o!("Service" => "Network"));
let (network, _network_send) = NetworkService::new(
let (network, network_send) = NetworkService::new(
beacon_chain.clone(),
network_config,
executor,
network_logger,
)?;
let mut rpc_exit_signal = None;
// spawn the RPC server
if config.rpc_conf.enabled {
rpc::start_server(&config.rpc_conf, &log);
rpc_exit_signal = Some(rpc::start_server(
&config.rpc_conf,
executor,
network_send,
beacon_chain.clone(),
&log,
));
}
Ok(Client {
config,
beacon_chain,
exit,
exit_signal,
rpc_exit_signal,
log,
network,
phantom: PhantomData,

View File

@@ -13,10 +13,11 @@ use libp2p::{
tokio_io::{AsyncRead, AsyncWrite},
NetworkBehaviour, PeerId,
};
use slog::{debug, o};
use slog::{debug, o, warn};
use ssz::{ssz_encode, Decodable, DecodeError, Encodable, SszStream};
use ssz_derive::{Decode, Encode};
use types::Attestation;
use types::Topic;
use types::{Topic, TopicHash};
/// Builds the network behaviour for the libp2p Swarm.
/// Implements gossipsub message routing.
@@ -47,13 +48,33 @@ impl<TSubstream: AsyncRead + AsyncWrite> NetworkBehaviourEventProcess<GossipsubE
{
fn inject_event(&mut self, event: GossipsubEvent) {
match event {
GossipsubEvent::Message(message) => {
let gs_message = String::from_utf8_lossy(&message.data);
// TODO: Remove this type - debug only
self.events
.push(BehaviourEvent::Message(gs_message.to_string()))
GossipsubEvent::Message(gs_msg) => {
let pubsub_message = match PubsubMessage::ssz_decode(&gs_msg.data, 0) {
//TODO: Punish peer on error
Err(e) => {
warn!(
self.log,
"Received undecodable message from Peer {:?}", gs_msg.source
);
return;
}
Ok((msg, _index)) => msg,
};
self.events.push(BehaviourEvent::GossipMessage {
source: gs_msg.source,
topics: gs_msg.topics,
message: pubsub_message,
});
}
_ => {}
GossipsubEvent::Subscribed {
peer_id: _,
topic: _,
}
| GossipsubEvent::Unsubscribed {
peer_id: _,
topic: _,
} => {}
}
}
}
@@ -147,6 +168,14 @@ impl<TSubstream: AsyncRead + AsyncWrite> Behaviour<TSubstream> {
pub fn send_rpc(&mut self, peer_id: PeerId, rpc_event: RPCEvent) {
self.serenity_rpc.send_rpc(peer_id, rpc_event);
}
/// Publishes a message on the pubsub (gossipsub) behaviour.
pub fn publish(&mut self, topics: Vec<Topic>, message: PubsubMessage) {
let message_bytes = ssz_encode(&message);
for topic in topics {
self.gossipsub.publish(topic, message_bytes.clone());
}
}
}
/// The types of events than can be obtained from polling the behaviour.
@@ -155,23 +184,51 @@ pub enum BehaviourEvent {
PeerDialed(PeerId),
Identified(PeerId, IdentifyInfo),
// TODO: This is a stub at the moment
Message(String),
GossipMessage {
source: PeerId,
topics: Vec<TopicHash>,
message: PubsubMessage,
},
}
/// Messages that are passed to and from the pubsub (Gossipsub) behaviour.
#[derive(Debug, Clone)]
pub enum IncomingGossip {
Block(BlockGossip),
Attestation(AttestationGossip),
pub enum PubsubMessage {
/// Gossipsub message providing notification of a new block.
Block(BlockRootSlot),
/// Gossipsub message providing notification of a new attestation.
Attestation(Attestation),
}
/// Gossipsub message providing notification of a new block.
#[derive(Encode, Decode, Clone, Debug, PartialEq)]
pub struct BlockGossip {
pub root: BlockRootSlot,
//TODO: Correctly encode/decode enums. Prefixing with integer for now.
impl Encodable for PubsubMessage {
fn ssz_append(&self, s: &mut SszStream) {
match self {
PubsubMessage::Block(block_gossip) => {
0u32.ssz_append(s);
block_gossip.ssz_append(s);
}
PubsubMessage::Attestation(attestation_gossip) => {
1u32.ssz_append(s);
attestation_gossip.ssz_append(s);
}
}
}
}
/// Gossipsub message providing notification of a new attestation.
#[derive(Encode, Decode, Clone, Debug, PartialEq)]
pub struct AttestationGossip {
pub attestation: Attestation,
impl Decodable for PubsubMessage {
fn ssz_decode(bytes: &[u8], index: usize) -> Result<(Self, usize), DecodeError> {
let (id, index) = u32::ssz_decode(bytes, index)?;
match id {
1 => {
let (block, index) = BlockRootSlot::ssz_decode(bytes, index)?;
Ok((PubsubMessage::Block(block), index))
}
2 => {
let (attestation, index) = Attestation::ssz_decode(bytes, index)?;
Ok((PubsubMessage::Attestation(attestation), index))
}
_ => Err(DecodeError::Invalid),
}
}
}

View File

@@ -8,12 +8,13 @@ pub mod error;
pub mod rpc;
mod service;
pub use behaviour::PubsubMessage;
pub use config::Config as NetworkConfig;
pub use libp2p::{
gossipsub::{GossipsubConfig, GossipsubConfigBuilder},
PeerId,
};
pub use rpc::{HelloMessage, RPCEvent};
pub use rpc::RPCEvent;
pub use service::Libp2pEvent;
pub use service::Service;
pub use types::multiaddr;

View File

@@ -1,4 +1,4 @@
use crate::behaviour::{Behaviour, BehaviourEvent};
use crate::behaviour::{Behaviour, BehaviourEvent, PubsubMessage};
use crate::error;
use crate::multiaddr::Protocol;
use crate::rpc::RPCEvent;
@@ -16,7 +16,7 @@ use libp2p::{core, secio, PeerId, Swarm, Transport};
use slog::{debug, info, trace, warn};
use std::io::{Error, ErrorKind};
use std::time::Duration;
use types::TopicBuilder;
use types::{TopicBuilder, TopicHash};
/// The configuration and state of the libp2p components for the beacon node.
pub struct Service {
@@ -107,9 +107,17 @@ impl Stream for Service {
//Behaviour events
Ok(Async::Ready(Some(event))) => match event {
// TODO: Stub here for debugging
BehaviourEvent::Message(m) => {
debug!(self.log, "Message received: {}", m);
return Ok(Async::Ready(Some(Libp2pEvent::Message(m))));
BehaviourEvent::GossipMessage {
source,
topics,
message,
} => {
debug!(self.log, "Pubsub message received: {:?}", message);
return Ok(Async::Ready(Some(Libp2pEvent::PubsubMessage {
source,
topics,
message,
})));
}
BehaviourEvent::RPC(peer_id, event) => {
return Ok(Async::Ready(Some(Libp2pEvent::RPC(peer_id, event))));
@@ -173,6 +181,10 @@ pub enum Libp2pEvent {
PeerDialed(PeerId),
/// Received information about a peer on the network.
Identified(PeerId, IdentifyInfo),
// TODO: Pub-sub testing only.
Message(String),
/// Received pubsub message.
PubsubMessage {
source: PeerId,
topics: Vec<TopicHash>,
message: PubsubMessage,
},
}

View File

@@ -7,7 +7,7 @@ use beacon_chain::{
types::{BeaconState, ChainSpec},
AggregationOutcome, CheckPoint,
};
use eth2_libp2p::HelloMessage;
use eth2_libp2p::rpc::HelloMessage;
use types::{Attestation, BeaconBlock, BeaconBlockBody, BeaconBlockHeader, Epoch, Hash256, Slot};
pub use beacon_chain::{BeaconChainError, BlockProcessingOutcome};
@@ -109,7 +109,7 @@ where
let state = self.get_state();
HelloMessage {
network_id: spec.network_id,
network_id: spec.chain_id,
latest_finalized_root: state.finalized_root,
latest_finalized_epoch: state.finalized_epoch,
best_root: self.best_block_root(),

View File

@@ -6,4 +6,5 @@ pub mod service;
pub mod sync;
pub use eth2_libp2p::NetworkConfig;
pub use service::NetworkMessage;
pub use service::Service;

View File

@@ -4,7 +4,7 @@ use crate::service::{NetworkMessage, OutgoingMessage};
use crate::sync::SimpleSync;
use crossbeam_channel::{unbounded as channel, Sender};
use eth2_libp2p::{
behaviour::IncomingGossip,
behaviour::PubsubMessage,
rpc::{methods::GoodbyeReason, RPCRequest, RPCResponse, RequestId},
PeerId, RPCEvent,
};
@@ -41,7 +41,7 @@ pub enum HandlerMessage {
/// An RPC response/request has been received.
RPC(PeerId, RPCEvent),
/// A gossip message has been received.
IncomingGossip(PeerId, IncomingGossip),
PubsubMessage(PeerId, PubsubMessage),
}
impl MessageHandler {
@@ -92,7 +92,7 @@ impl MessageHandler {
self.handle_rpc_message(peer_id, rpc_event);
}
// we have received an RPC message request/response
HandlerMessage::IncomingGossip(peer_id, gossip) => {
HandlerMessage::PubsubMessage(peer_id, gossip) => {
self.handle_gossip(peer_id, gossip);
}
//TODO: Handle all messages
@@ -205,13 +205,13 @@ impl MessageHandler {
}
/// Handle RPC messages
fn handle_gossip(&mut self, peer_id: PeerId, gossip_message: IncomingGossip) {
fn handle_gossip(&mut self, peer_id: PeerId, gossip_message: PubsubMessage) {
match gossip_message {
IncomingGossip::Block(message) => {
PubsubMessage::Block(message) => {
self.sync
.on_block_gossip(peer_id, message, &mut self.network_context)
}
IncomingGossip::Attestation(message) => {
PubsubMessage::Attestation(message) => {
self.sync
.on_attestation_gossip(peer_id, message, &mut self.network_context)
}

View File

@@ -3,15 +3,16 @@ use crate::error;
use crate::message_handler::{HandlerMessage, MessageHandler};
use crate::NetworkConfig;
use crossbeam_channel::{unbounded as channel, Sender, TryRecvError};
use eth2_libp2p::RPCEvent;
use eth2_libp2p::Service as LibP2PService;
use eth2_libp2p::{Libp2pEvent, PeerId};
use eth2_libp2p::{PubsubMessage, RPCEvent};
use futures::prelude::*;
use futures::sync::oneshot;
use futures::Stream;
use slog::{debug, info, o, trace};
use std::sync::Arc;
use tokio::runtime::TaskExecutor;
use types::Topic;
/// Service that handles communication between internal services and the eth2_libp2p network service.
pub struct Service {
@@ -99,6 +100,7 @@ fn spawn_service(
Ok(network_exit)
}
//TODO: Potentially handle channel errors
fn network_service(
mut libp2p_service: LibP2PService,
network_recv: crossbeam_channel::Receiver<NetworkMessage>,
@@ -128,10 +130,17 @@ fn network_service(
"We have identified peer: {:?} with {:?}", peer_id, info
);
}
Libp2pEvent::Message(m) => debug!(
libp2p_service.log,
"Network Service: Message received: {}", m
),
Libp2pEvent::PubsubMessage {
source,
topics: _,
message,
} => {
//TODO: Decide if we need to propagate the topic upwards. (Potentially for
//attestations)
message_handler_send
.send(HandlerMessage::PubsubMessage(source, message))
.map_err(|_| " failed to send pubsub message to handler")?;
}
},
Ok(Async::Ready(None)) => unreachable!("Stream never ends"),
Ok(Async::NotReady) => break,
@@ -156,6 +165,10 @@ fn network_service(
}
};
}
Ok(NetworkMessage::Publish { topics, message }) => {
debug!(log, "Sending pubsub message on topics {:?}", topics);
libp2p_service.swarm.publish(topics, message);
}
Err(TryRecvError::Empty) => break,
Err(TryRecvError::Disconnected) => {
return Err(eth2_libp2p::error::Error::from(
@@ -174,6 +187,11 @@ pub enum NetworkMessage {
/// Send a message to libp2p service.
//TODO: Define typing for messages across the wire
Send(PeerId, OutgoingMessage),
/// Publish a message to pubsub mechanism.
Publish {
topics: Vec<Topic>,
message: PubsubMessage,
},
}
/// Type of outgoing messages that can be sent through the network service.

View File

@@ -1,7 +1,6 @@
use super::import_queue::ImportQueue;
use crate::beacon_chain::BeaconChain;
use crate::message_handler::NetworkContext;
use eth2_libp2p::behaviour::{AttestationGossip, BlockGossip};
use eth2_libp2p::rpc::methods::*;
use eth2_libp2p::rpc::{RPCRequest, RPCResponse, RequestId};
use eth2_libp2p::PeerId;
@@ -9,7 +8,7 @@ use slog::{debug, error, info, o, warn};
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use types::{Epoch, Hash256, Slot};
use types::{Attestation, Epoch, Hash256, Slot};
/// The number of slots that we can import blocks ahead of us, before going into full Sync mode.
const SLOT_IMPORT_TOLERANCE: u64 = 100;
@@ -521,12 +520,12 @@ impl SimpleSync {
pub fn on_block_gossip(
&mut self,
peer_id: PeerId,
msg: BlockGossip,
msg: BlockRootSlot,
network: &mut NetworkContext,
) {
debug!(
self.log,
"BlockGossip";
"BlockSlot";
"peer" => format!("{:?}", peer_id),
);
// TODO: filter out messages that a prior to the finalized slot.
@@ -535,12 +534,12 @@ impl SimpleSync {
// now.
//
// Note: only requests the new block -- will fail if we don't have its parents.
if self.import_queue.is_new_block(&msg.root.block_root) {
if self.import_queue.is_new_block(&msg.block_root) {
self.request_block_headers(
peer_id,
BeaconBlockHeadersRequest {
start_root: msg.root.block_root,
start_slot: msg.root.slot,
start_root: msg.block_root,
start_slot: msg.slot,
max_headers: 1,
skip_slots: 0,
},
@@ -555,19 +554,19 @@ impl SimpleSync {
pub fn on_attestation_gossip(
&mut self,
peer_id: PeerId,
msg: AttestationGossip,
msg: Attestation,
_network: &mut NetworkContext,
) {
debug!(
self.log,
"AttestationGossip";
"Attestation";
"peer" => format!("{:?}", peer_id),
);
// Awaiting a proper operations pool before we can import attestations.
//
// https://github.com/sigp/lighthouse/issues/281
match self.chain.process_attestation(msg.attestation) {
match self.chain.process_attestation(msg) {
Ok(_) => panic!("Impossible, method not implemented."),
Err(_) => error!(self.log, "Attestation processing not implemented!"),
}

View File

@@ -7,7 +7,12 @@ edition = "2018"
[dependencies]
bls = { path = "../../eth2/utils/bls" }
beacon_chain = { path = "../beacon_chain" }
network = { path = "../network" }
eth2-libp2p = { path = "../eth2-libp2p" }
version = { path = "../version" }
types = { path = "../../eth2/types" }
ssz = { path = "../../eth2/utils/ssz" }
slot_clock = { path = "../../eth2/utils/slot_clock" }
protos = { path = "../../protos" }
grpcio = { version = "0.4", default-features = false, features = ["protobuf-codec"] }
protobuf = "2.0.2"
@@ -16,8 +21,8 @@ db = { path = "../db" }
dirs = "1.0.3"
futures = "0.1.23"
slog = "^2.2.3"
slot_clock = { path = "../../eth2/utils/slot_clock" }
slog-term = "^2.4.0"
slog-async = "^2.3.0"
types = { path = "../../eth2/types" }
ssz = { path = "../../eth2/utils/ssz" }
tokio = "0.1.17"
exit-future = "0.1.4"
crossbeam-channel = "0.3.8"

View File

@@ -0,0 +1,61 @@
use futures::Future;
use grpcio::{RpcContext, UnarySink};
use protos::services::{
Attestation as AttestationProto, ProduceAttestation, ProduceAttestationResponse,
ProduceAttestationRequest, PublishAttestationResponse, PublishAttestationRequest,
PublishAttestation
};
use protos::services_grpc::BeaconBlockService;
use slog::Logger;
#[derive(Clone)]
pub struct AttestationServiceInstance {
pub log: Logger,
}
impl AttestationService for AttestationServiceInstance {
/// Produce a `BeaconBlock` for signing by a validator.
fn produce_attestation(
&mut self,
ctx: RpcContext,
req: ProduceAttestationRequest,
sink: UnarySink<ProduceAttestationResponse>,
) {
println!("producing attestation at slot {}", req.get_slot());
// TODO: build a legit block.
let mut attestation = AttestationProto::new();
attestation.set_slot(req.get_slot());
// TODO Set the shard to something legit.
attestation.set_shard(0);
attestation.set_block_root(b"cats".to_vec());
let mut resp = ProduceAttestationResponse::new();
resp.set_attestation_data(attestation);
let f = sink
.success(resp)
.map_err(move |e| println!("failed to reply {:?}: {:?}", req, e));
ctx.spawn(f)
}
/// Accept some fully-formed `BeaconBlock`, process and publish it.
fn publish_attestation(
&mut self,
ctx: RpcContext,
req: PublishAttestationRequest,
sink: UnarySink<PublishAttestationResponse>,
) {
println!("publishing attestation {:?}", req.get_block());
// TODO: actually process the block.
let mut resp = PublishAttestationResponse::new();
resp.set_success(true);
let f = sink
.success(resp)
.map_err(move |e| println!("failed to reply {:?}: {:?}", req, e));
ctx.spawn(f)
}
}

View File

@@ -1,14 +1,20 @@
use crossbeam_channel;
use eth2_libp2p::rpc::methods::BlockRootSlot;
use eth2_libp2p::PubsubMessage;
use futures::Future;
use grpcio::{RpcContext, UnarySink};
use network::NetworkMessage;
use protos::services::{
BeaconBlock as BeaconBlockProto, ProduceBeaconBlockRequest, ProduceBeaconBlockResponse,
PublishBeaconBlockRequest, PublishBeaconBlockResponse,
};
use protos::services_grpc::BeaconBlockService;
use slog::Logger;
use types::{Hash256, Slot};
#[derive(Clone)]
pub struct BeaconBlockServiceInstance {
pub network_chan: crossbeam_channel::Sender<NetworkMessage>,
pub log: Logger,
}
@@ -43,7 +49,22 @@ impl BeaconBlockService for BeaconBlockServiceInstance {
req: PublishBeaconBlockRequest,
sink: UnarySink<PublishBeaconBlockResponse>,
) {
println!("publishing {:?}", req.get_block());
let block = req.get_block();
let block_root = Hash256::from_slice(block.get_block_root());
let block_slot = BlockRootSlot {
block_root,
slot: Slot::from(block.get_slot()),
};
println!("publishing block with root {:?}", block_root);
// TODO: Obtain topics from the network service properly.
let topic = types::TopicBuilder::new("beacon_chain".to_string()).build();
let message = PubsubMessage::Block(block_slot);
println!("Sending beacon block to gossipsub");
self.network_chan.send(NetworkMessage::Publish {
topics: vec![topic],
message,
});
// TODO: actually process the block.
let mut resp = PublishBeaconBlockResponse::new();

View File

@@ -0,0 +1,31 @@
use beacon_chain::BeaconChain as RawBeaconChain;
use beacon_chain::{
db::ClientDB,
fork_choice::ForkChoice,
parking_lot::RwLockReadGuard,
slot_clock::SlotClock,
types::{BeaconState, ChainSpec},
CheckPoint,
};
/// The RPC's API to the beacon chain.
pub trait BeaconChain: Send + Sync {
fn get_spec(&self) -> &ChainSpec;
fn get_state(&self) -> RwLockReadGuard<BeaconState>;
}
impl<T, U, F> BeaconChain for RawBeaconChain<T, U, F>
where
T: ClientDB + Sized,
U: SlotClock,
F: ForkChoice,
{
fn get_spec(&self) -> &ChainSpec {
&self.spec
}
fn get_state(&self) -> RwLockReadGuard<BeaconState> {
self.state.read()
}
}

View File

@@ -0,0 +1,46 @@
use crate::beacon_chain::BeaconChain;
use futures::Future;
use grpcio::{RpcContext, UnarySink};
use protos::services::{Empty, Fork, NodeInfo};
use protos::services_grpc::BeaconNodeService;
use slog::{trace, warn};
use std::sync::Arc;
#[derive(Clone)]
pub struct BeaconNodeServiceInstance {
pub chain: Arc<BeaconChain>,
pub log: slog::Logger,
}
impl BeaconNodeService for BeaconNodeServiceInstance {
/// Provides basic node information.
fn info(&mut self, ctx: RpcContext, _req: Empty, sink: UnarySink<NodeInfo>) {
trace!(self.log, "Node info requested via RPC");
// build the response
let mut node_info = NodeInfo::new();
node_info.set_version(version::version());
// get the chain state
let state = self.chain.get_state();
let state_fork = state.fork.clone();
let genesis_time = state.genesis_time.clone();
// build the rpc fork struct
let mut fork = Fork::new();
fork.set_previous_version(state_fork.previous_version.to_vec());
fork.set_current_version(state_fork.current_version.to_vec());
fork.set_epoch(state_fork.epoch.into());
node_info.set_fork(fork);
node_info.set_genesis_time(genesis_time);
node_info.set_chain_id(self.chain.get_spec().chain_id as u32);
// send the node_info the requester
let error_log = self.log.clone();
let f = sink
.success(node_info)
.map_err(move |e| warn!(error_log, "failed to reply {:?}", e));
ctx.spawn(f)
}
}

View File

@@ -1,22 +1,51 @@
mod beacon_block;
pub mod beacon_chain;
mod beacon_node;
pub mod config;
mod validator;
use self::beacon_block::BeaconBlockServiceInstance;
use self::beacon_chain::BeaconChain;
use self::beacon_node::BeaconNodeServiceInstance;
use self::validator::ValidatorServiceInstance;
pub use config::Config as RPCConfig;
use futures::{future, Future};
use grpcio::{Environment, Server, ServerBuilder};
use protos::services_grpc::{create_beacon_block_service, create_validator_service};
use network::NetworkMessage;
use protos::services_grpc::{
create_beacon_block_service, create_beacon_node_service, create_validator_service,
};
use slog::{info, o, warn};
use std::sync::Arc;
use tokio::runtime::TaskExecutor;
use slog::{info, o};
pub fn start_server(config: &RPCConfig, log: &slog::Logger) -> Server {
pub fn start_server(
config: &RPCConfig,
executor: &TaskExecutor,
network_chan: crossbeam_channel::Sender<NetworkMessage>,
beacon_chain: Arc<BeaconChain>,
log: &slog::Logger,
) -> exit_future::Signal {
let log = log.new(o!("Service"=>"RPC"));
let env = Arc::new(Environment::new(1));
// build a channel to kill the rpc server
let (rpc_exit_signal, rpc_exit) = exit_future::signal();
// build the individual rpc services
let beacon_node_service = {
let instance = BeaconNodeServiceInstance {
chain: beacon_chain.clone(),
log: log.clone(),
};
create_beacon_node_service(instance)
};
let beacon_block_service = {
let instance = BeaconBlockServiceInstance { log: log.clone() };
let instance = BeaconBlockServiceInstance {
network_chan,
log: log.clone(),
};
create_beacon_block_service(instance)
};
let validator_service = {
@@ -27,12 +56,26 @@ pub fn start_server(config: &RPCConfig, log: &slog::Logger) -> Server {
let mut server = ServerBuilder::new(env)
.register_service(beacon_block_service)
.register_service(validator_service)
.register_service(beacon_node_service)
.bind(config.listen_address.to_string(), config.port)
.build()
.unwrap();
server.start();
for &(ref host, port) in server.bind_addrs() {
info!(log, "gRPC listening on {}:{}", host, port);
}
server
let spawn_rpc = {
server.start();
for &(ref host, port) in server.bind_addrs() {
info!(log, "gRPC listening on {}:{}", host, port);
}
rpc_exit.and_then(move |_| {
info!(log, "RPC Server shutting down");
server
.shutdown()
.wait()
.map(|_| ())
.map_err(|e| warn!(log, "RPC server failed to shutdown: {:?}", e))?;
Ok(())
})
};
executor.spawn(spawn_rpc);
rpc_exit_signal
}