mirror of
https://github.com/sigp/lighthouse.git
synced 2026-03-14 10:22:38 +00:00
Remove grpc from beacon_node
This commit is contained in:
@@ -13,7 +13,6 @@ beacon_chain = { path = "../beacon_chain" }
|
||||
store = { path = "../store" }
|
||||
network = { path = "../network" }
|
||||
eth2-libp2p = { path = "../eth2-libp2p" }
|
||||
rpc = { path = "../rpc" }
|
||||
rest_api = { path = "../rest_api" }
|
||||
websocket_server = { path = "../websocket_server" }
|
||||
prometheus = "0.7.0"
|
||||
|
||||
@@ -19,7 +19,6 @@ use genesis::{
|
||||
use lighthouse_bootstrap::Bootstrapper;
|
||||
use lmd_ghost::LmdGhost;
|
||||
use network::{NetworkConfig, NetworkMessage, Service as NetworkService};
|
||||
use rpc::Config as RpcConfig;
|
||||
use slog::{debug, error, info, warn};
|
||||
use std::net::SocketAddr;
|
||||
use std::path::Path;
|
||||
@@ -267,35 +266,6 @@ where
|
||||
Ok(self)
|
||||
}
|
||||
|
||||
/// Immediately starts the gRPC server (gRPC is soon to be deprecated).
|
||||
pub fn grpc_server(mut self, config: &RpcConfig) -> Result<Self, String> {
|
||||
let beacon_chain = self
|
||||
.beacon_chain
|
||||
.clone()
|
||||
.ok_or_else(|| "grpc_server requires a beacon chain")?;
|
||||
let context = self
|
||||
.runtime_context
|
||||
.as_ref()
|
||||
.ok_or_else(|| "grpc_server requires a runtime_context")?
|
||||
.service_context("grpc");
|
||||
let network_send = self
|
||||
.libp2p_network_send
|
||||
.clone()
|
||||
.ok_or_else(|| "grpc_server requires a libp2p network")?;
|
||||
|
||||
let exit_signal = rpc::start_server(
|
||||
config,
|
||||
&context.executor,
|
||||
network_send,
|
||||
beacon_chain,
|
||||
context.log,
|
||||
);
|
||||
|
||||
self.exit_signals.push(exit_signal);
|
||||
|
||||
Ok(self)
|
||||
}
|
||||
|
||||
/// Immediately starts the beacon node REST API http server.
|
||||
pub fn http_server(
|
||||
mut self,
|
||||
@@ -305,7 +275,7 @@ where
|
||||
let beacon_chain = self
|
||||
.beacon_chain
|
||||
.clone()
|
||||
.ok_or_else(|| "grpc_server requires a beacon chain")?;
|
||||
.ok_or_else(|| "http_server requires a beacon chain")?;
|
||||
let context = self
|
||||
.runtime_context
|
||||
.as_ref()
|
||||
@@ -314,11 +284,11 @@ where
|
||||
let network = self
|
||||
.libp2p_network
|
||||
.clone()
|
||||
.ok_or_else(|| "grpc_server requires a libp2p network")?;
|
||||
.ok_or_else(|| "http_server requires a libp2p network")?;
|
||||
let network_send = self
|
||||
.libp2p_network_send
|
||||
.clone()
|
||||
.ok_or_else(|| "grpc_server requires a libp2p network sender")?;
|
||||
.ok_or_else(|| "http_server requires a libp2p network sender")?;
|
||||
|
||||
let network_info = rest_api::NetworkInfo {
|
||||
network_service: network.clone(),
|
||||
|
||||
@@ -53,7 +53,6 @@ pub struct Config {
|
||||
/// via the CLI at runtime, instead of from a configuration file saved to disk.
|
||||
pub genesis: ClientGenesis,
|
||||
pub network: network::NetworkConfig,
|
||||
pub rpc: rpc::Config,
|
||||
pub rest_api: rest_api::Config,
|
||||
pub websocket_server: websocket_server::Config,
|
||||
pub eth1: eth1::Config,
|
||||
@@ -68,7 +67,6 @@ impl Default for Config {
|
||||
db_name: "chain_db".to_string(),
|
||||
genesis: <_>::default(),
|
||||
network: NetworkConfig::new(),
|
||||
rpc: <_>::default(),
|
||||
rest_api: <_>::default(),
|
||||
websocket_server: <_>::default(),
|
||||
spec_constants: TESTNET_SPEC_CONSTANTS.into(),
|
||||
@@ -145,7 +143,6 @@ impl Config {
|
||||
};
|
||||
|
||||
self.network.apply_cli_args(args)?;
|
||||
self.rpc.apply_cli_args(args)?;
|
||||
self.rest_api.apply_cli_args(args)?;
|
||||
self.websocket_server.apply_cli_args(args)?;
|
||||
|
||||
|
||||
@@ -1,23 +0,0 @@
|
||||
[package]
|
||||
name = "rpc"
|
||||
version = "0.1.0"
|
||||
authors = ["Age Manning <Age@AgeManning.com>"]
|
||||
edition = "2018"
|
||||
|
||||
[dependencies]
|
||||
bls = { path = "../../eth2/utils/bls" }
|
||||
beacon_chain = { path = "../beacon_chain" }
|
||||
network = { path = "../network" }
|
||||
eth2-libp2p = { path = "../eth2-libp2p" }
|
||||
version = { path = "../version" }
|
||||
types = { path = "../../eth2/types" }
|
||||
eth2_ssz = "0.1.2"
|
||||
protos = { path = "../../protos" }
|
||||
grpcio = { version = "0.4.6", default-features = false, features = ["protobuf-codec"] }
|
||||
clap = "2.33.0"
|
||||
futures = "0.1.29"
|
||||
serde = "1.0.102"
|
||||
serde_derive = "1.0.102"
|
||||
slog = { version = "2.5.2", features = ["max_level_trace"] }
|
||||
tokio = "0.1.22"
|
||||
exit-future = "0.1.4"
|
||||
@@ -1,177 +0,0 @@
|
||||
use beacon_chain::{BeaconChain, BeaconChainError, BeaconChainTypes};
|
||||
use eth2_libp2p::PubsubMessage;
|
||||
use eth2_libp2p::Topic;
|
||||
use eth2_libp2p::{BEACON_ATTESTATION_TOPIC, TOPIC_ENCODING_POSTFIX, TOPIC_PREFIX};
|
||||
use futures::Future;
|
||||
use grpcio::{RpcContext, RpcStatus, RpcStatusCode, UnarySink};
|
||||
use network::NetworkMessage;
|
||||
use protos::services::{
|
||||
AttestationData as AttestationDataProto, ProduceAttestationDataRequest,
|
||||
ProduceAttestationDataResponse, PublishAttestationRequest, PublishAttestationResponse,
|
||||
};
|
||||
use protos::services_grpc::AttestationService;
|
||||
use slog::{error, info, trace, warn};
|
||||
use ssz::{ssz_encode, Decode, Encode};
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::mpsc;
|
||||
use types::{Attestation, Slot};
|
||||
|
||||
pub struct AttestationServiceInstance<T: BeaconChainTypes> {
|
||||
pub chain: Arc<BeaconChain<T>>,
|
||||
pub network_chan: mpsc::UnboundedSender<NetworkMessage>,
|
||||
pub log: slog::Logger,
|
||||
}
|
||||
|
||||
// NOTE: Deriving Clone puts bogus bounds on T, so we implement it manually.
|
||||
impl<T: BeaconChainTypes> Clone for AttestationServiceInstance<T> {
|
||||
fn clone(&self) -> Self {
|
||||
Self {
|
||||
chain: self.chain.clone(),
|
||||
network_chan: self.network_chan.clone(),
|
||||
log: self.log.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: BeaconChainTypes> AttestationService for AttestationServiceInstance<T> {
|
||||
/// Produce the `AttestationData` for signing by a validator.
|
||||
fn produce_attestation_data(
|
||||
&mut self,
|
||||
ctx: RpcContext,
|
||||
req: ProduceAttestationDataRequest,
|
||||
sink: UnarySink<ProduceAttestationDataResponse>,
|
||||
) {
|
||||
trace!(
|
||||
&self.log,
|
||||
"Attempting to produce attestation at slot {}",
|
||||
req.get_slot()
|
||||
);
|
||||
|
||||
// Then get the AttestationData from the beacon chain
|
||||
// NOTE(v0.9): shard is incorrectly named, all this should be deleted
|
||||
let shard = req.get_shard();
|
||||
let slot_requested = req.get_slot();
|
||||
let attestation_data = match self
|
||||
.chain
|
||||
.produce_attestation_data(Slot::from(slot_requested), shard)
|
||||
{
|
||||
Ok(v) => v,
|
||||
Err(e) => {
|
||||
// Could not produce an attestation
|
||||
let log_clone = self.log.clone();
|
||||
let f = sink
|
||||
.fail(RpcStatus::new(
|
||||
RpcStatusCode::Unknown,
|
||||
Some(format!("Could not produce an attestation: {:?}", e)),
|
||||
))
|
||||
.map_err(move |e| warn!(log_clone, "failed to reply {:?}: {:?}", req, e));
|
||||
return ctx.spawn(f);
|
||||
}
|
||||
};
|
||||
|
||||
let mut attestation_data_proto = AttestationDataProto::new();
|
||||
attestation_data_proto.set_ssz(ssz_encode(&attestation_data));
|
||||
|
||||
let mut resp = ProduceAttestationDataResponse::new();
|
||||
resp.set_attestation_data(attestation_data_proto);
|
||||
|
||||
let error_log = self.log.clone();
|
||||
let f = sink
|
||||
.success(resp)
|
||||
.map_err(move |e| error!(error_log, "Failed to reply with success {:?}: {:?}", req, e));
|
||||
ctx.spawn(f)
|
||||
}
|
||||
|
||||
/// Accept some fully-formed `FreeAttestation` from the validator,
|
||||
/// store it, and aggregate it into an `Attestation`.
|
||||
fn publish_attestation(
|
||||
&mut self,
|
||||
ctx: RpcContext,
|
||||
req: PublishAttestationRequest,
|
||||
sink: UnarySink<PublishAttestationResponse>,
|
||||
) {
|
||||
trace!(self.log, "Publishing attestation");
|
||||
|
||||
let mut resp = PublishAttestationResponse::new();
|
||||
let ssz_serialized_attestation = req.get_attestation().get_ssz();
|
||||
|
||||
let attestation = match Attestation::from_ssz_bytes(ssz_serialized_attestation) {
|
||||
Ok(v) => v,
|
||||
Err(_) => {
|
||||
let log_clone = self.log.clone();
|
||||
let f = sink
|
||||
.fail(RpcStatus::new(
|
||||
RpcStatusCode::InvalidArgument,
|
||||
Some("Invalid attestation".to_string()),
|
||||
))
|
||||
.map_err(move |_| warn!(log_clone, "failed to reply {:?}", req));
|
||||
return ctx.spawn(f);
|
||||
}
|
||||
};
|
||||
|
||||
match self.chain.process_attestation(attestation.clone()) {
|
||||
Ok(_) => {
|
||||
// Attestation was successfully processed.
|
||||
info!(
|
||||
self.log,
|
||||
"Valid attestation from RPC";
|
||||
"target_epoch" => attestation.data.target.epoch,
|
||||
"index" => attestation.data.index,
|
||||
);
|
||||
|
||||
// valid attestation, propagate to the network
|
||||
let topic_string = format!(
|
||||
"/{}/{}/{}",
|
||||
TOPIC_PREFIX, BEACON_ATTESTATION_TOPIC, TOPIC_ENCODING_POSTFIX
|
||||
);
|
||||
let topic = Topic::new(topic_string);
|
||||
let message = PubsubMessage::Attestation(attestation.as_ssz_bytes());
|
||||
|
||||
self.network_chan
|
||||
.try_send(NetworkMessage::Publish {
|
||||
topics: vec![topic],
|
||||
message,
|
||||
})
|
||||
.unwrap_or_else(|e| {
|
||||
error!(
|
||||
self.log,
|
||||
"Failed to gossip attestation";
|
||||
"error" => format!("{:?}", e)
|
||||
);
|
||||
});
|
||||
|
||||
resp.set_success(true);
|
||||
}
|
||||
Err(BeaconChainError::AttestationValidationError(e)) => {
|
||||
// Attestation was invalid
|
||||
warn!(
|
||||
self.log,
|
||||
"Invalid attestation from RPC";
|
||||
"error" => format!("{:?}", e),
|
||||
);
|
||||
resp.set_success(false);
|
||||
resp.set_msg(format!("InvalidAttestation: {:?}", e).as_bytes().to_vec());
|
||||
}
|
||||
Err(e) => {
|
||||
// Some other error
|
||||
warn!(
|
||||
self.log,
|
||||
"Failed to process attestation from RPC";
|
||||
"error" => format!("{:?}", e),
|
||||
);
|
||||
resp.set_success(false);
|
||||
resp.set_msg(
|
||||
format!("There was a beacon chain error: {:?}", e)
|
||||
.as_bytes()
|
||||
.to_vec(),
|
||||
);
|
||||
}
|
||||
};
|
||||
|
||||
let error_log = self.log.clone();
|
||||
let f = sink
|
||||
.success(resp)
|
||||
.map_err(move |e| error!(error_log, "failed to reply {:?}: {:?}", req, e));
|
||||
ctx.spawn(f)
|
||||
}
|
||||
}
|
||||
@@ -1,185 +0,0 @@
|
||||
use beacon_chain::{BeaconChain, BeaconChainTypes, BlockProcessingOutcome};
|
||||
use eth2_libp2p::{PubsubMessage, Topic};
|
||||
use eth2_libp2p::{BEACON_BLOCK_TOPIC, TOPIC_ENCODING_POSTFIX, TOPIC_PREFIX};
|
||||
use futures::Future;
|
||||
use grpcio::{RpcContext, RpcStatus, RpcStatusCode, UnarySink};
|
||||
use network::NetworkMessage;
|
||||
use protos::services::{
|
||||
BeaconBlock as BeaconBlockProto, ProduceBeaconBlockRequest, ProduceBeaconBlockResponse,
|
||||
PublishBeaconBlockRequest, PublishBeaconBlockResponse,
|
||||
};
|
||||
use protos::services_grpc::BeaconBlockService;
|
||||
use slog::Logger;
|
||||
use slog::{error, info, trace, warn};
|
||||
use ssz::{ssz_encode, Decode, Encode};
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::mpsc;
|
||||
use types::{BeaconBlock, Signature, Slot};
|
||||
|
||||
pub struct BeaconBlockServiceInstance<T: BeaconChainTypes> {
|
||||
pub chain: Arc<BeaconChain<T>>,
|
||||
pub network_chan: mpsc::UnboundedSender<NetworkMessage>,
|
||||
pub log: Logger,
|
||||
}
|
||||
|
||||
// NOTE: Deriving Clone puts bogus bounds on T, so we implement it manually.
|
||||
impl<T: BeaconChainTypes> Clone for BeaconBlockServiceInstance<T> {
|
||||
fn clone(&self) -> Self {
|
||||
Self {
|
||||
chain: self.chain.clone(),
|
||||
network_chan: self.network_chan.clone(),
|
||||
log: self.log.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: BeaconChainTypes> BeaconBlockService for BeaconBlockServiceInstance<T> {
|
||||
/// Produce a `BeaconBlock` for signing by a validator.
|
||||
fn produce_beacon_block(
|
||||
&mut self,
|
||||
ctx: RpcContext,
|
||||
req: ProduceBeaconBlockRequest,
|
||||
sink: UnarySink<ProduceBeaconBlockResponse>,
|
||||
) {
|
||||
trace!(self.log, "Generating a beacon block"; "req" => format!("{:?}", req));
|
||||
|
||||
// decode the request
|
||||
let requested_slot = Slot::from(req.get_slot());
|
||||
let randao_reveal = match Signature::from_ssz_bytes(req.get_randao_reveal()) {
|
||||
Ok(reveal) => reveal,
|
||||
Err(_) => {
|
||||
// decode error, incorrect signature
|
||||
let log_clone = self.log.clone();
|
||||
let f = sink
|
||||
.fail(RpcStatus::new(
|
||||
RpcStatusCode::InvalidArgument,
|
||||
Some("Invalid randao reveal signature".to_string()),
|
||||
))
|
||||
.map_err(move |e| warn!(log_clone, "failed to reply {:?}: {:?}", req, e));
|
||||
return ctx.spawn(f);
|
||||
}
|
||||
};
|
||||
|
||||
let produced_block = match self.chain.produce_block(randao_reveal, requested_slot) {
|
||||
Ok((block, _state)) => block,
|
||||
Err(e) => {
|
||||
// could not produce a block
|
||||
let log_clone = self.log.clone();
|
||||
warn!(self.log, "RPC Error"; "Error" => format!("Could not produce a block:{:?}",e));
|
||||
let f = sink
|
||||
.fail(RpcStatus::new(
|
||||
RpcStatusCode::Unknown,
|
||||
Some(format!("Could not produce a block: {:?}", e)),
|
||||
))
|
||||
.map_err(move |e| warn!(log_clone, "failed to reply {:?}: {:?}", req, e));
|
||||
return ctx.spawn(f);
|
||||
}
|
||||
};
|
||||
|
||||
assert_eq!(
|
||||
produced_block.slot, requested_slot,
|
||||
"should produce at the requested slot"
|
||||
);
|
||||
|
||||
let mut block = BeaconBlockProto::new();
|
||||
block.set_ssz(ssz_encode(&produced_block));
|
||||
|
||||
let mut resp = ProduceBeaconBlockResponse::new();
|
||||
resp.set_block(block);
|
||||
|
||||
let f = sink
|
||||
.success(resp)
|
||||
.map_err(move |e| println!("failed to reply {:?}: {:?}", req, e));
|
||||
ctx.spawn(f)
|
||||
}
|
||||
|
||||
/// Accept some fully-formed `BeaconBlock`, process and publish it.
|
||||
fn publish_beacon_block(
|
||||
&mut self,
|
||||
ctx: RpcContext,
|
||||
req: PublishBeaconBlockRequest,
|
||||
sink: UnarySink<PublishBeaconBlockResponse>,
|
||||
) {
|
||||
trace!(&self.log, "Attempting to publish a block");
|
||||
|
||||
let mut resp = PublishBeaconBlockResponse::new();
|
||||
|
||||
let ssz_serialized_block = req.get_block().get_ssz();
|
||||
|
||||
match BeaconBlock::from_ssz_bytes(ssz_serialized_block) {
|
||||
Ok(block) => {
|
||||
match self.chain.process_block(block.clone()) {
|
||||
Ok(outcome) => {
|
||||
if let BlockProcessingOutcome::Processed { block_root } = outcome {
|
||||
// Block was successfully processed.
|
||||
info!(
|
||||
self.log,
|
||||
"Valid block from RPC";
|
||||
"root" => format!("{}", block_root),
|
||||
"slot" => block.slot,
|
||||
);
|
||||
|
||||
// create the network topic to send on
|
||||
let topic_string = format!(
|
||||
"/{}/{}/{}",
|
||||
TOPIC_PREFIX, BEACON_BLOCK_TOPIC, TOPIC_ENCODING_POSTFIX
|
||||
);
|
||||
let topic = Topic::new(topic_string);
|
||||
let message = PubsubMessage::Block(block.as_ssz_bytes());
|
||||
|
||||
// Publish the block to the p2p network via gossipsub.
|
||||
self.network_chan
|
||||
.try_send(NetworkMessage::Publish {
|
||||
topics: vec![topic],
|
||||
message,
|
||||
})
|
||||
.unwrap_or_else(|e| {
|
||||
error!(
|
||||
self.log,
|
||||
"Failed to gossip beacon block";
|
||||
"error" => format!("{:?}", e)
|
||||
);
|
||||
});
|
||||
|
||||
resp.set_success(true);
|
||||
} else {
|
||||
// Block was not successfully processed.
|
||||
warn!(
|
||||
self.log,
|
||||
"Invalid block from RPC";
|
||||
"outcome" => format!("{:?}", outcome)
|
||||
);
|
||||
|
||||
resp.set_success(false);
|
||||
resp.set_msg(
|
||||
format!("InvalidBlock: {:?}", outcome).as_bytes().to_vec(),
|
||||
);
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
// Some failure during processing.
|
||||
error!(
|
||||
self.log,
|
||||
"Failed to process beacon block";
|
||||
"error" => format!("{:?}", e)
|
||||
);
|
||||
|
||||
resp.set_success(false);
|
||||
resp.set_msg(format!("failed_to_process: {:?}", e).as_bytes().to_vec());
|
||||
}
|
||||
}
|
||||
|
||||
resp.set_success(true);
|
||||
}
|
||||
Err(_) => {
|
||||
resp.set_success(false);
|
||||
resp.set_msg(b"Invalid SSZ".to_vec());
|
||||
}
|
||||
};
|
||||
|
||||
let f = sink
|
||||
.success(resp)
|
||||
.map_err(move |e| println!("failed to reply {:?}: {:?}", req, e));
|
||||
ctx.spawn(f)
|
||||
}
|
||||
}
|
||||
@@ -1,58 +0,0 @@
|
||||
use beacon_chain::{BeaconChain, BeaconChainTypes};
|
||||
use futures::Future;
|
||||
use grpcio::{RpcContext, UnarySink};
|
||||
use protos::services::{Empty, Fork, NodeInfoResponse};
|
||||
use protos::services_grpc::BeaconNodeService;
|
||||
use slog::{trace, warn};
|
||||
use std::sync::Arc;
|
||||
|
||||
pub struct BeaconNodeServiceInstance<T: BeaconChainTypes> {
|
||||
pub chain: Arc<BeaconChain<T>>,
|
||||
pub log: slog::Logger,
|
||||
}
|
||||
|
||||
// NOTE: Deriving Clone puts bogus bounds on T, so we implement it manually.
|
||||
impl<T: BeaconChainTypes> Clone for BeaconNodeServiceInstance<T> {
|
||||
fn clone(&self) -> Self {
|
||||
Self {
|
||||
chain: self.chain.clone(),
|
||||
log: self.log.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: BeaconChainTypes> BeaconNodeService for BeaconNodeServiceInstance<T> {
|
||||
/// Provides basic node information.
|
||||
fn info(&mut self, ctx: RpcContext, _req: Empty, sink: UnarySink<NodeInfoResponse>) {
|
||||
trace!(self.log, "Node info requested via RPC");
|
||||
|
||||
// build the response
|
||||
let mut node_info = NodeInfoResponse::new();
|
||||
node_info.set_version(version::version());
|
||||
|
||||
// get the chain state
|
||||
let state = &self.chain.head().beacon_state;
|
||||
let state_fork = state.fork.clone();
|
||||
let genesis_time = state.genesis_time;
|
||||
|
||||
// build the rpc fork struct
|
||||
let mut fork = Fork::new();
|
||||
fork.set_previous_version(state_fork.previous_version.to_vec());
|
||||
fork.set_current_version(state_fork.current_version.to_vec());
|
||||
fork.set_epoch(state_fork.epoch.into());
|
||||
|
||||
let spec = &self.chain.spec;
|
||||
|
||||
node_info.set_fork(fork);
|
||||
node_info.set_genesis_time(genesis_time);
|
||||
node_info.set_genesis_slot(spec.genesis_slot.as_u64());
|
||||
node_info.set_network_id(u32::from(spec.network_id));
|
||||
|
||||
// send the node_info the requester
|
||||
let error_log = self.log.clone();
|
||||
let f = sink
|
||||
.success(node_info)
|
||||
.map_err(move |e| warn!(error_log, "failed to reply {:?}", e));
|
||||
ctx.spawn(f)
|
||||
}
|
||||
}
|
||||
@@ -1,44 +0,0 @@
|
||||
use clap::ArgMatches;
|
||||
use serde_derive::{Deserialize, Serialize};
|
||||
use std::net::Ipv4Addr;
|
||||
|
||||
/// RPC Configuration
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct Config {
|
||||
/// Enable the RPC server.
|
||||
pub enabled: bool,
|
||||
/// The IPv4 address the RPC will listen on.
|
||||
pub listen_address: Ipv4Addr,
|
||||
/// The port the RPC will listen on.
|
||||
pub port: u16,
|
||||
}
|
||||
|
||||
impl Default for Config {
|
||||
fn default() -> Self {
|
||||
Config {
|
||||
enabled: true,
|
||||
listen_address: Ipv4Addr::new(127, 0, 0, 1),
|
||||
port: 5051,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Config {
|
||||
pub fn apply_cli_args(&mut self, args: &ArgMatches) -> Result<(), &'static str> {
|
||||
if args.is_present("no-grpc") {
|
||||
self.enabled = false;
|
||||
}
|
||||
|
||||
if let Some(rpc_address) = args.value_of("rpc-address") {
|
||||
self.listen_address = rpc_address
|
||||
.parse::<Ipv4Addr>()
|
||||
.map_err(|_| "rpc-address is not IPv4 address")?;
|
||||
}
|
||||
|
||||
if let Some(rpc_port) = args.value_of("rpc-port") {
|
||||
self.port = rpc_port.parse::<u16>().map_err(|_| "rpc-port is not u16")?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@@ -1,101 +0,0 @@
|
||||
mod attestation;
|
||||
mod beacon_block;
|
||||
mod beacon_node;
|
||||
pub mod config;
|
||||
mod validator;
|
||||
|
||||
use self::attestation::AttestationServiceInstance;
|
||||
use self::beacon_block::BeaconBlockServiceInstance;
|
||||
use self::beacon_node::BeaconNodeServiceInstance;
|
||||
use self::validator::ValidatorServiceInstance;
|
||||
use beacon_chain::{BeaconChain, BeaconChainTypes};
|
||||
pub use config::Config;
|
||||
use futures::Future;
|
||||
use grpcio::{Environment, ServerBuilder};
|
||||
use network::NetworkMessage;
|
||||
use protos::services_grpc::{
|
||||
create_attestation_service, create_beacon_block_service, create_beacon_node_service,
|
||||
create_validator_service,
|
||||
};
|
||||
use slog::{info, warn};
|
||||
use std::sync::Arc;
|
||||
use tokio::runtime::TaskExecutor;
|
||||
use tokio::sync::mpsc;
|
||||
|
||||
pub fn start_server<T: BeaconChainTypes>(
|
||||
config: &Config,
|
||||
executor: &TaskExecutor,
|
||||
network_chan: mpsc::UnboundedSender<NetworkMessage>,
|
||||
beacon_chain: Arc<BeaconChain<T>>,
|
||||
log: slog::Logger,
|
||||
) -> exit_future::Signal {
|
||||
let env = Arc::new(Environment::new(1));
|
||||
|
||||
// build a channel to kill the rpc server
|
||||
let (rpc_exit_signal, rpc_exit) = exit_future::signal();
|
||||
|
||||
// build the individual rpc services
|
||||
let beacon_node_service = {
|
||||
let instance = BeaconNodeServiceInstance {
|
||||
chain: beacon_chain.clone(),
|
||||
log: log.clone(),
|
||||
};
|
||||
create_beacon_node_service(instance)
|
||||
};
|
||||
|
||||
let beacon_block_service = {
|
||||
let instance = BeaconBlockServiceInstance {
|
||||
chain: beacon_chain.clone(),
|
||||
network_chan: network_chan.clone(),
|
||||
log: log.clone(),
|
||||
};
|
||||
create_beacon_block_service(instance)
|
||||
};
|
||||
let validator_service = {
|
||||
let instance = ValidatorServiceInstance {
|
||||
chain: beacon_chain.clone(),
|
||||
log: log.clone(),
|
||||
};
|
||||
create_validator_service(instance)
|
||||
};
|
||||
let attestation_service = {
|
||||
let instance = AttestationServiceInstance {
|
||||
network_chan,
|
||||
chain: beacon_chain.clone(),
|
||||
log: log.clone(),
|
||||
};
|
||||
create_attestation_service(instance)
|
||||
};
|
||||
|
||||
let mut server = ServerBuilder::new(env)
|
||||
.register_service(beacon_block_service)
|
||||
.register_service(validator_service)
|
||||
.register_service(beacon_node_service)
|
||||
.register_service(attestation_service)
|
||||
.bind(config.listen_address.to_string(), config.port)
|
||||
.build()
|
||||
.unwrap();
|
||||
|
||||
let spawn_rpc = {
|
||||
server.start();
|
||||
for &(ref host, port) in server.bind_addrs() {
|
||||
info!(
|
||||
log,
|
||||
"gRPC API started";
|
||||
"port" => port,
|
||||
"host" => host,
|
||||
);
|
||||
}
|
||||
rpc_exit.and_then(move |_| {
|
||||
info!(log, "RPC Server shutting down");
|
||||
server
|
||||
.shutdown()
|
||||
.wait()
|
||||
.map(|_| ())
|
||||
.map_err(|e| warn!(log, "RPC server failed to shutdown: {:?}", e))?;
|
||||
Ok(())
|
||||
})
|
||||
};
|
||||
executor.spawn(spawn_rpc);
|
||||
rpc_exit_signal
|
||||
}
|
||||
@@ -1,185 +0,0 @@
|
||||
use beacon_chain::{BeaconChain, BeaconChainTypes};
|
||||
use bls::PublicKey;
|
||||
use futures::Future;
|
||||
use grpcio::{RpcContext, RpcStatus, RpcStatusCode, UnarySink};
|
||||
use protos::services::{ActiveValidator, GetDutiesRequest, GetDutiesResponse, ValidatorDuty};
|
||||
use protos::services_grpc::ValidatorService;
|
||||
use slog::{trace, warn};
|
||||
use ssz::Decode;
|
||||
use std::sync::Arc;
|
||||
use types::{Epoch, EthSpec, RelativeEpoch};
|
||||
|
||||
pub struct ValidatorServiceInstance<T: BeaconChainTypes> {
|
||||
pub chain: Arc<BeaconChain<T>>,
|
||||
pub log: slog::Logger,
|
||||
}
|
||||
|
||||
// NOTE: Deriving Clone puts bogus bounds on T, so we implement it manually.
|
||||
impl<T: BeaconChainTypes> Clone for ValidatorServiceInstance<T> {
|
||||
fn clone(&self) -> Self {
|
||||
Self {
|
||||
chain: self.chain.clone(),
|
||||
log: self.log.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: BeaconChainTypes> ValidatorService for ValidatorServiceInstance<T> {
|
||||
/// For a list of validator public keys, this function returns the slot at which each
|
||||
/// validator must propose a block, attest to a shard, their shard committee and the shard they
|
||||
/// need to attest to.
|
||||
fn get_validator_duties(
|
||||
&mut self,
|
||||
ctx: RpcContext,
|
||||
req: GetDutiesRequest,
|
||||
sink: UnarySink<GetDutiesResponse>,
|
||||
) {
|
||||
trace!(self.log, "RPC request"; "endpoint" => "GetValidatorDuties", "epoch" => req.get_epoch());
|
||||
let validators = req.get_validators();
|
||||
|
||||
let epoch = Epoch::from(req.get_epoch());
|
||||
let slot = epoch.start_slot(T::EthSpec::slots_per_epoch());
|
||||
|
||||
let mut state = if let Ok(state) = self.chain.state_at_slot(slot) {
|
||||
state.clone()
|
||||
} else {
|
||||
let log_clone = self.log.clone();
|
||||
let f = sink
|
||||
.fail(RpcStatus::new(
|
||||
RpcStatusCode::FailedPrecondition,
|
||||
Some("No state".to_string()),
|
||||
))
|
||||
.map_err(move |e| warn!(log_clone, "failed to reply {:?}: {:?}", req, e));
|
||||
return ctx.spawn(f);
|
||||
};
|
||||
|
||||
let _ = state.build_all_caches(&self.chain.spec);
|
||||
|
||||
assert_eq!(
|
||||
state.current_epoch(),
|
||||
epoch,
|
||||
"Retrieved state should be from the same epoch"
|
||||
);
|
||||
|
||||
let mut resp = GetDutiesResponse::new();
|
||||
let resp_validators = resp.mut_active_validators();
|
||||
|
||||
let validator_proposers: Result<Vec<usize>, _> = epoch
|
||||
.slot_iter(T::EthSpec::slots_per_epoch())
|
||||
.map(|slot| state.get_beacon_proposer_index(slot, &self.chain.spec))
|
||||
.collect();
|
||||
let validator_proposers = match validator_proposers {
|
||||
Ok(v) => v,
|
||||
Err(e) => {
|
||||
// could not get the validator proposer index
|
||||
let log_clone = self.log.clone();
|
||||
let f = sink
|
||||
.fail(RpcStatus::new(
|
||||
RpcStatusCode::FailedPrecondition,
|
||||
Some(format!("Could not find beacon proposers: {:?}", e)),
|
||||
))
|
||||
.map_err(move |e| warn!(log_clone, "failed to reply {:?} : {:?}", req, e));
|
||||
return ctx.spawn(f);
|
||||
}
|
||||
};
|
||||
|
||||
// get the duties for each validator
|
||||
for validator_pk in validators.get_public_keys() {
|
||||
let mut active_validator = ActiveValidator::new();
|
||||
|
||||
let public_key = match PublicKey::from_ssz_bytes(validator_pk) {
|
||||
Ok(v) => v,
|
||||
Err(_) => {
|
||||
let log_clone = self.log.clone();
|
||||
let f = sink
|
||||
.fail(RpcStatus::new(
|
||||
RpcStatusCode::InvalidArgument,
|
||||
Some("Invalid public_key".to_string()),
|
||||
))
|
||||
.map_err(move |_| warn!(log_clone, "failed to reply {:?}", req));
|
||||
return ctx.spawn(f);
|
||||
}
|
||||
};
|
||||
|
||||
// get the validator index
|
||||
let val_index = match state.get_validator_index(&public_key) {
|
||||
Ok(Some(index)) => index,
|
||||
Ok(None) => {
|
||||
// index not present in registry, set the duties for this key to None
|
||||
warn!(
|
||||
self.log,
|
||||
"RPC requested a public key that is not in the registry: {:?}", public_key
|
||||
);
|
||||
active_validator.set_none(false);
|
||||
resp_validators.push(active_validator);
|
||||
continue;
|
||||
}
|
||||
// the cache is not built, throw an error
|
||||
Err(e) => {
|
||||
let log_clone = self.log.clone();
|
||||
let f = sink
|
||||
.fail(RpcStatus::new(
|
||||
RpcStatusCode::FailedPrecondition,
|
||||
Some(format!("Beacon state error {:?}", e)),
|
||||
))
|
||||
.map_err(move |e| warn!(log_clone, "Failed to reply {:?}: {:?}", req, e));
|
||||
return ctx.spawn(f);
|
||||
}
|
||||
};
|
||||
|
||||
// get attestation duties and check if validator is active
|
||||
let attestation_duties = match state
|
||||
.get_attestation_duties(val_index, RelativeEpoch::Current)
|
||||
{
|
||||
Ok(Some(v)) => v,
|
||||
Ok(_) => {
|
||||
// validator is inactive, go to the next validator
|
||||
warn!(
|
||||
self.log,
|
||||
"RPC requested an inactive validator key: {:?}", public_key
|
||||
);
|
||||
active_validator.set_none(false);
|
||||
resp_validators.push(active_validator);
|
||||
continue;
|
||||
}
|
||||
// the cache is not built, throw an error
|
||||
Err(e) => {
|
||||
let log_clone = self.log.clone();
|
||||
let f = sink
|
||||
.fail(RpcStatus::new(
|
||||
RpcStatusCode::FailedPrecondition,
|
||||
Some(format!("Beacon state error {:?}", e)),
|
||||
))
|
||||
.map_err(move |e| warn!(log_clone, "Failed to reply {:?}: {:?}", req, e));
|
||||
return ctx.spawn(f);
|
||||
}
|
||||
};
|
||||
|
||||
// we have an active validator, set its duties
|
||||
let mut duty = ValidatorDuty::new();
|
||||
|
||||
// check if the validator needs to propose a block
|
||||
if let Some(slot) = validator_proposers.iter().position(|&v| val_index == v) {
|
||||
duty.set_block_production_slot(
|
||||
epoch.start_slot(T::EthSpec::slots_per_epoch()).as_u64() + slot as u64,
|
||||
);
|
||||
} else {
|
||||
// no blocks to propose this epoch
|
||||
duty.set_none(false)
|
||||
}
|
||||
|
||||
duty.set_committee_index(attestation_duties.committee_position as u64);
|
||||
duty.set_attestation_slot(attestation_duties.slot.as_u64());
|
||||
duty.set_attestation_shard(attestation_duties.index);
|
||||
duty.set_committee_len(attestation_duties.committee_len as u64);
|
||||
|
||||
active_validator.set_duty(duty);
|
||||
resp_validators.push(active_validator);
|
||||
}
|
||||
|
||||
let f = sink
|
||||
.success(resp)
|
||||
.map_err(move |e| println!("failed to reply {:?}: {:?}", req, e));
|
||||
ctx.spawn(f)
|
||||
}
|
||||
}
|
||||
@@ -587,7 +587,6 @@ impl ConfigBuilder {
|
||||
.map_err(|e| format!("Unable to parse default listen address: {:?}", e))?;
|
||||
|
||||
self.client_config.network.listen_address = addr.into();
|
||||
self.client_config.rpc.listen_address = addr;
|
||||
self.client_config.rest_api.listen_address = addr;
|
||||
|
||||
Ok(())
|
||||
@@ -607,7 +606,6 @@ impl ConfigBuilder {
|
||||
|
||||
self.client_config.network.libp2p_port += bump;
|
||||
self.client_config.network.discovery_port += bump;
|
||||
self.client_config.rpc.port += bump;
|
||||
self.client_config.rest_api.port += bump;
|
||||
self.client_config.websocket_server.port += bump;
|
||||
}
|
||||
|
||||
@@ -125,7 +125,6 @@ impl<E: EthSpec> ProductionBeaconNode<E> {
|
||||
.build_beacon_chain()?
|
||||
.libp2p_network(&client_config.network)?
|
||||
.http_server(&client_config, &http_eth2_config)?
|
||||
.grpc_server(&client_config.rpc)?
|
||||
.peer_count_notifier()?
|
||||
.slot_notifier()?;
|
||||
|
||||
|
||||
Reference in New Issue
Block a user