From 8dd07dd7d2d8bf16c2f571732bd91e9f2ec8e1c9 Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Sat, 25 May 2019 14:31:13 +1000 Subject: [PATCH 01/10] Add http server to beacon node w/ hello world --- Cargo.toml | 1 + beacon_node/client/Cargo.toml | 1 + beacon_node/client/src/client_config.rs | 4 +- beacon_node/client/src/lib.rs | 13 ++- beacon_node/http_server/Cargo.toml | 31 +++++++ beacon_node/http_server/src/lib.rs | 115 ++++++++++++++++++++++++ 6 files changed, 163 insertions(+), 2 deletions(-) create mode 100644 beacon_node/http_server/Cargo.toml create mode 100644 beacon_node/http_server/src/lib.rs diff --git a/Cargo.toml b/Cargo.toml index 00c3543091..b4d53d4206 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,6 +24,7 @@ members = [ "beacon_node", "beacon_node/store", "beacon_node/client", + "beacon_node/http_server", "beacon_node/network", "beacon_node/eth2-libp2p", "beacon_node/rpc", diff --git a/beacon_node/client/Cargo.toml b/beacon_node/client/Cargo.toml index 4a976eec43..6634e260d5 100644 --- a/beacon_node/client/Cargo.toml +++ b/beacon_node/client/Cargo.toml @@ -8,6 +8,7 @@ edition = "2018" beacon_chain = { path = "../beacon_chain" } network = { path = "../network" } store = { path = "../store" } +http_server = { path = "../http_server" } rpc = { path = "../rpc" } fork_choice = { path = "../../eth2/fork_choice" } types = { path = "../../eth2/types" } diff --git a/beacon_node/client/src/client_config.rs b/beacon_node/client/src/client_config.rs index 8d7176c2cf..32722e1245 100644 --- a/beacon_node/client/src/client_config.rs +++ b/beacon_node/client/src/client_config.rs @@ -1,5 +1,6 @@ use clap::ArgMatches; use fork_choice::ForkChoiceAlgorithm; +use http_server::HttpServerConfig; use network::NetworkConfig; use slog::error; use std::fs; @@ -27,7 +28,7 @@ pub struct ClientConfig { pub db_type: DBType, pub db_name: PathBuf, pub rpc_conf: rpc::RPCConfig, - //pub ipc_conf: + pub http_conf: HttpServerConfig, //pub ipc_conf: } impl Default for ClientConfig { @@ -55,6 +56,7 @@ impl Default for ClientConfig { // default db name for disk-based dbs db_name: data_dir.join("chain_db"), rpc_conf: rpc::RPCConfig::default(), + http_conf: HttpServerConfig::default(), } } } diff --git a/beacon_node/client/src/lib.rs b/beacon_node/client/src/lib.rs index 71d4013d33..6433b94e2b 100644 --- a/beacon_node/client/src/lib.rs +++ b/beacon_node/client/src/lib.rs @@ -98,7 +98,7 @@ impl Client { Some(rpc::start_server( &config.rpc_conf, executor, - network_send, + network_send.clone(), beacon_chain.clone(), &log, )) @@ -106,6 +106,17 @@ impl Client { None }; + // Start the `http_server` service. + // + // Note: presently we are ignoring the config and _always_ starting a HTTP server. + http_server::start_service( + &config.http_conf, + executor, + network_send, + beacon_chain.clone(), + &log, + ); + let (slot_timer_exit_signal, exit) = exit_future::signal(); if let Ok(Some(duration_to_next_slot)) = beacon_chain.slot_clock.duration_to_next_slot() { // set up the validator work interval - start at next slot and proceed every slot diff --git a/beacon_node/http_server/Cargo.toml b/beacon_node/http_server/Cargo.toml new file mode 100644 index 0000000000..5d5d7e492d --- /dev/null +++ b/beacon_node/http_server/Cargo.toml @@ -0,0 +1,31 @@ +[package] +name = "http_server" +version = "0.1.0" +authors = ["Paul Hauner "] +edition = "2018" + +[dependencies] +bls = { path = "../../eth2/utils/bls" } +beacon_chain = { path = "../beacon_chain" } +iron = "^0.6" +router = "^0.6" +network = { path = "../network" } +eth2-libp2p = { path = "../eth2-libp2p" } +version = { path = "../version" } +types = { path = "../../eth2/types" } +ssz = { path = "../../eth2/utils/ssz" } +slot_clock = { path = "../../eth2/utils/slot_clock" } +protos = { path = "../../protos" } +fork_choice = { path = "../../eth2/fork_choice" } +grpcio = { version = "0.4", default-features = false, features = ["protobuf-codec"] } +protobuf = "2.0.2" +clap = "2.32.0" +store = { path = "../store" } +dirs = "1.0.3" +futures = "0.1.23" +slog = "^2.2.3" +slog-term = "^2.4.0" +slog-async = "^2.3.0" +tokio = "0.1.17" +exit-future = "0.1.4" +crossbeam-channel = "0.3.8" diff --git a/beacon_node/http_server/src/lib.rs b/beacon_node/http_server/src/lib.rs new file mode 100644 index 0000000000..676928ce68 --- /dev/null +++ b/beacon_node/http_server/src/lib.rs @@ -0,0 +1,115 @@ +use beacon_chain::BeaconChain; +use futures::Future; +use grpcio::{Environment, ServerBuilder}; +use network::NetworkMessage; +use protos::services_grpc::{ + create_attestation_service, create_beacon_block_service, create_beacon_node_service, + create_validator_service, +}; +use slog::{info, o, warn}; +use std::net::Ipv4Addr; +use std::sync::Arc; +use tokio::runtime::TaskExecutor; +use types::EthSpec; + +use iron::prelude::*; +use iron::{status::Status, Handler, IronResult, Request, Response}; +use router::Router; + +#[derive(PartialEq, Clone, Debug)] +pub struct HttpServerConfig { + pub enabled: bool, + pub listen_address: String, + /* + pub listen_address: Ipv4Addr, + pub port: u16, + */ +} + +impl Default for HttpServerConfig { + fn default() -> Self { + Self { + enabled: false, + listen_address: "127.0.0.1:5051".to_string(), + /* + listen_address: Ipv4Addr::new(127, 0, 0, 1), + port: 5051, + */ + } + } +} + +pub struct IndexHandler { + message: String, +} + +impl Handler for IndexHandler { + fn handle(&self, _: &mut Request) -> IronResult { + Ok(Response::with((Status::Ok, self.message.clone()))) + } +} + +pub fn create_iron_http_server() -> Iron { + let index_handler = IndexHandler { + message: "Hello world".to_string(), + }; + + let mut router = Router::new(); + router.get("/", index_handler, "index"); + Iron::new(router) +} + +pub fn start_service( + config: &HttpServerConfig, + executor: &TaskExecutor, + network_chan: crossbeam_channel::Sender, + beacon_chain: Arc>, + log: &slog::Logger, +) -> exit_future::Signal +where + T: store::Store, + U: slot_clock::SlotClock, + F: fork_choice::ForkChoice, + E: EthSpec, +{ + let log = log.new(o!("Service"=>"RPC")); + let env = Arc::new(Environment::new(1)); + + // Create: + // - `shutdown_trigger` a one-shot to shut down this service. + // - `wait_for_shutdown` a future that will wait until someone calls shutdown. + let (shutdown_trigger, wait_for_shutdown) = exit_future::signal(); + + let iron = create_iron_http_server(); + + let spawn_rpc = { + let result = iron.http(config.listen_address.clone()); + + if result.is_ok() { + info!(log, "HTTP server running on {}", config.listen_address); + } else { + warn!( + log, + "HTTP server failed to start on {}", config.listen_address + ); + } + + wait_for_shutdown.and_then(move |_| { + info!(log, "HTTP server shutting down"); + + // TODO: shutdown server. + /* + server + .shutdown() + .wait() + .map(|_| ()) + .map_err(|e| warn!(log, "RPC server failed to shutdown: {:?}", e))?; + Ok(()) + */ + info!(log, "HTTP server exited"); + Ok(()) + }) + }; + executor.spawn(spawn_rpc); + shutdown_trigger +} From 596ff5178bbf052c10fdfd999d7748ccd57c422d Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Sat, 25 May 2019 16:17:48 +1000 Subject: [PATCH 02/10] Add http server shutdown, tidy --- beacon_node/http_server/src/lib.rs | 43 +++++++++++++++--------------- 1 file changed, 21 insertions(+), 22 deletions(-) diff --git a/beacon_node/http_server/src/lib.rs b/beacon_node/http_server/src/lib.rs index 676928ce68..b230f924fe 100644 --- a/beacon_node/http_server/src/lib.rs +++ b/beacon_node/http_server/src/lib.rs @@ -1,17 +1,10 @@ use beacon_chain::BeaconChain; use futures::Future; -use grpcio::{Environment, ServerBuilder}; use network::NetworkMessage; -use protos::services_grpc::{ - create_attestation_service, create_beacon_block_service, create_beacon_node_service, - create_validator_service, -}; use slog::{info, o, warn}; -use std::net::Ipv4Addr; use std::sync::Arc; use tokio::runtime::TaskExecutor; use types::EthSpec; - use iron::prelude::*; use iron::{status::Status, Handler, IronResult, Request, Response}; use router::Router; @@ -62,8 +55,8 @@ pub fn create_iron_http_server() -> Iron { pub fn start_service( config: &HttpServerConfig, executor: &TaskExecutor, - network_chan: crossbeam_channel::Sender, - beacon_chain: Arc>, + _network_chan: crossbeam_channel::Sender, + _beacon_chain: Arc>, log: &slog::Logger, ) -> exit_future::Signal where @@ -73,19 +66,20 @@ where E: EthSpec, { let log = log.new(o!("Service"=>"RPC")); - let env = Arc::new(Environment::new(1)); // Create: // - `shutdown_trigger` a one-shot to shut down this service. // - `wait_for_shutdown` a future that will wait until someone calls shutdown. let (shutdown_trigger, wait_for_shutdown) = exit_future::signal(); + // Create an `iron` http, without starting it yet. let iron = create_iron_http_server(); let spawn_rpc = { - let result = iron.http(config.listen_address.clone()); + // Start the HTTP server + let server_start_result = iron.http(config.listen_address.clone()); - if result.is_ok() { + if server_start_result.is_ok() { info!(log, "HTTP server running on {}", config.listen_address); } else { warn!( @@ -94,19 +88,24 @@ where ); } + // Build a future that will shutdown the HTTP server when the `shutdown_trigger` is + // triggered. wait_for_shutdown.and_then(move |_| { info!(log, "HTTP server shutting down"); - // TODO: shutdown server. - /* - server - .shutdown() - .wait() - .map(|_| ()) - .map_err(|e| warn!(log, "RPC server failed to shutdown: {:?}", e))?; - Ok(()) - */ - info!(log, "HTTP server exited"); + if let Ok(mut server) = server_start_result { + // According to the documentation, this function "doesn't work" and the server + // keeps listening. + // + // It is being called anyway, because it seems like the right thing to do. If you + // know this has negative side-effects, please create an issue to discuss. + // + // See: https://docs.rs/iron/0.6.0/iron/struct.Listening.html#impl + match server.close() { + _=> () + }; + } + info!(log, "HTTP server shutdown complete."); Ok(()) }) }; From 85211ebccd884b09336d42464d69bb4b22484aae Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Sat, 25 May 2019 17:25:21 +1000 Subject: [PATCH 03/10] Add basic prometheus endpoint --- beacon_node/client/src/lib.rs | 7 ++- beacon_node/http_server/Cargo.toml | 1 + beacon_node/http_server/src/lib.rs | 51 ++++++++++++++----- .../http_server/src/prometheus_handler.rs | 43 ++++++++++++++++ 4 files changed, 86 insertions(+), 16 deletions(-) create mode 100644 beacon_node/http_server/src/prometheus_handler.rs diff --git a/beacon_node/client/src/lib.rs b/beacon_node/client/src/lib.rs index 6433b94e2b..9445799d57 100644 --- a/beacon_node/client/src/lib.rs +++ b/beacon_node/client/src/lib.rs @@ -35,6 +35,8 @@ pub struct Client { pub network: Arc>, /// Signal to terminate the RPC server. pub rpc_exit_signal: Option, + /// Signal to terminate the HTTP server. + pub http_exit_signal: Option, /// Signal to terminate the slot timer. pub slot_timer_exit_signal: Option, /// The clients logger. @@ -109,13 +111,13 @@ impl Client { // Start the `http_server` service. // // Note: presently we are ignoring the config and _always_ starting a HTTP server. - http_server::start_service( + let http_exit_signal = Some(http_server::start_service( &config.http_conf, executor, network_send, beacon_chain.clone(), &log, - ); + )); let (slot_timer_exit_signal, exit) = exit_future::signal(); if let Ok(Some(duration_to_next_slot)) = beacon_chain.slot_clock.duration_to_next_slot() { @@ -146,6 +148,7 @@ impl Client { Ok(Client { _config: config, _beacon_chain: beacon_chain, + http_exit_signal, rpc_exit_signal, slot_timer_exit_signal: Some(slot_timer_exit_signal), log, diff --git a/beacon_node/http_server/Cargo.toml b/beacon_node/http_server/Cargo.toml index 5d5d7e492d..6f4579d175 100644 --- a/beacon_node/http_server/Cargo.toml +++ b/beacon_node/http_server/Cargo.toml @@ -19,6 +19,7 @@ protos = { path = "../../protos" } fork_choice = { path = "../../eth2/fork_choice" } grpcio = { version = "0.4", default-features = false, features = ["protobuf-codec"] } protobuf = "2.0.2" +prometheus = "^0.6" clap = "2.32.0" store = { path = "../store" } dirs = "1.0.3" diff --git a/beacon_node/http_server/src/lib.rs b/beacon_node/http_server/src/lib.rs index b230f924fe..b2c3a86fc3 100644 --- a/beacon_node/http_server/src/lib.rs +++ b/beacon_node/http_server/src/lib.rs @@ -1,13 +1,16 @@ +mod prometheus_handler; + use beacon_chain::BeaconChain; use futures::Future; +use iron::prelude::*; +use iron::{status::Status, Handler, IronResult, Request, Response}; use network::NetworkMessage; +use prometheus_handler::PrometheusHandler; +use router::Router; use slog::{info, o, warn}; use std::sync::Arc; use tokio::runtime::TaskExecutor; use types::EthSpec; -use iron::prelude::*; -use iron::{status::Status, Handler, IronResult, Request, Response}; -use router::Router; #[derive(PartialEq, Clone, Debug)] pub struct HttpServerConfig { @@ -42,13 +45,25 @@ impl Handler for IndexHandler { } } -pub fn create_iron_http_server() -> Iron { +pub fn create_iron_http_server( + beacon_chain: Arc>, +) -> Iron +where + T: store::Store + 'static, + U: slot_clock::SlotClock + 'static, + F: fork_choice::ForkChoice + 'static, + E: EthSpec + 'static, +{ let index_handler = IndexHandler { message: "Hello world".to_string(), }; + let prom_handler = PrometheusHandler { + beacon_chain: beacon_chain, + }; let mut router = Router::new(); router.get("/", index_handler, "index"); + router.get("/prometheus/", prom_handler, "prometheus"); Iron::new(router) } @@ -56,16 +71,16 @@ pub fn start_service( config: &HttpServerConfig, executor: &TaskExecutor, _network_chan: crossbeam_channel::Sender, - _beacon_chain: Arc>, + beacon_chain: Arc>, log: &slog::Logger, ) -> exit_future::Signal where - T: store::Store, - U: slot_clock::SlotClock, - F: fork_choice::ForkChoice, - E: EthSpec, + T: store::Store + 'static, + U: slot_clock::SlotClock + 'static, + F: fork_choice::ForkChoice + 'static, + E: EthSpec + 'static, { - let log = log.new(o!("Service"=>"RPC")); + let log = log.new(o!("Service"=>"HTTP")); // Create: // - `shutdown_trigger` a one-shot to shut down this service. @@ -73,9 +88,14 @@ where let (shutdown_trigger, wait_for_shutdown) = exit_future::signal(); // Create an `iron` http, without starting it yet. - let iron = create_iron_http_server(); + let iron = create_iron_http_server(beacon_chain); - let spawn_rpc = { + // Create a HTTP server future. + // + // 1. Start the HTTP server + // 2. Build an exit future that will shutdown the server when requested. + // 3. Return the exit future, so the caller may shutdown the service when desired. + let http_service = { // Start the HTTP server let server_start_result = iron.http(config.listen_address.clone()); @@ -102,13 +122,16 @@ where // // See: https://docs.rs/iron/0.6.0/iron/struct.Listening.html#impl match server.close() { - _=> () + _ => (), }; } info!(log, "HTTP server shutdown complete."); Ok(()) }) }; - executor.spawn(spawn_rpc); + + // Attach the HTTP server to the executor. + executor.spawn(http_service); + shutdown_trigger } diff --git a/beacon_node/http_server/src/prometheus_handler.rs b/beacon_node/http_server/src/prometheus_handler.rs new file mode 100644 index 0000000000..cf577a9eb4 --- /dev/null +++ b/beacon_node/http_server/src/prometheus_handler.rs @@ -0,0 +1,43 @@ +use beacon_chain::BeaconChain; +use iron::{status::Status, Handler, IronResult, Request, Response}; +use prometheus::{IntCounter, Encoder, Opts, Registry, TextEncoder}; +use std::sync::Arc; +use types::EthSpec; + +pub struct PrometheusHandler { + pub beacon_chain: Arc>, +} + +impl PrometheusHandler where E: EthSpec {} + +impl Handler for PrometheusHandler +where + E: EthSpec + 'static, + U: slot_clock::SlotClock + Send + Sync + 'static, + T: Send + Sync + 'static, + F: Send + Sync + 'static, +{ + fn handle(&self, _: &mut Request) -> IronResult { + // Create a Counter. + let counter_opts = Opts::new("present_slot", "direct_slot_clock_reading"); + let counter = IntCounter::with_opts(counter_opts).unwrap(); + + // Create a Registry and register Counter. + let r = Registry::new(); + r.register(Box::new(counter.clone())).unwrap(); + + if let Ok(Some(slot)) = self.beacon_chain.slot_clock.present_slot() { + counter.inc_by(slot.as_u64() as i64); + } + + // Gather the metrics. + let mut buffer = vec![]; + let encoder = TextEncoder::new(); + let metric_families = r.gather(); + encoder.encode(&metric_families, &mut buffer).unwrap(); + + let prom_string = String::from_utf8(buffer).unwrap(); + + Ok(Response::with((Status::Ok, prom_string))) + } +} From 45e3a1759cb75fb732a300486038ad26882e738c Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Sat, 25 May 2019 17:56:53 +1000 Subject: [PATCH 04/10] Add slot to prometheus endpoint --- .../http_server/src/prometheus_handler.rs | 31 ++++++++++++------- 1 file changed, 20 insertions(+), 11 deletions(-) diff --git a/beacon_node/http_server/src/prometheus_handler.rs b/beacon_node/http_server/src/prometheus_handler.rs index cf577a9eb4..ae84d48713 100644 --- a/beacon_node/http_server/src/prometheus_handler.rs +++ b/beacon_node/http_server/src/prometheus_handler.rs @@ -1,8 +1,8 @@ use beacon_chain::BeaconChain; use iron::{status::Status, Handler, IronResult, Request, Response}; -use prometheus::{IntCounter, Encoder, Opts, Registry, TextEncoder}; +use prometheus::{Encoder, IntCounter, Opts, Registry, TextEncoder}; use std::sync::Arc; -use types::EthSpec; +use types::{EthSpec, Slot}; pub struct PrometheusHandler { pub beacon_chain: Arc>, @@ -18,17 +18,19 @@ where F: Send + Sync + 'static, { fn handle(&self, _: &mut Request) -> IronResult { - // Create a Counter. - let counter_opts = Opts::new("present_slot", "direct_slot_clock_reading"); - let counter = IntCounter::with_opts(counter_opts).unwrap(); - - // Create a Registry and register Counter. let r = Registry::new(); - r.register(Box::new(counter.clone())).unwrap(); - if let Ok(Some(slot)) = self.beacon_chain.slot_clock.present_slot() { - counter.inc_by(slot.as_u64() as i64); - } + let present_slot = if let Ok(Some(slot)) = self.beacon_chain.slot_clock.present_slot() { + slot + } else { + Slot::new(0) + }; + register_and_set_slot( + &r, + "present_slot", + "direct_slock_clock_reading", + present_slot, + ); // Gather the metrics. let mut buffer = vec![]; @@ -41,3 +43,10 @@ where Ok(Response::with((Status::Ok, prom_string))) } } + +fn register_and_set_slot(registry: &Registry, name: &str, help: &str, slot: Slot) { + let counter_opts = Opts::new(name, help); + let counter = IntCounter::with_opts(counter_opts).unwrap(); + registry.register(Box::new(counter.clone())).unwrap(); + counter.inc_by(slot.as_u64() as i64); +} From ee8d13573fd90386a50b7a4a953c3ae810dfa0aa Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Sat, 25 May 2019 20:51:15 +1000 Subject: [PATCH 05/10] Create `BeaconChainTypes`, thread through runtime --- beacon_node/beacon_chain/src/beacon_chain.rs | 61 +++---- beacon_node/beacon_chain/src/initialise.rs | 153 ------------------ beacon_node/beacon_chain/src/lib.rs | 6 +- .../beacon_chain/src/test_utils/mod.rs | 3 - .../testing_beacon_chain_builder.rs | 49 ------ beacon_node/client/Cargo.toml | 1 + beacon_node/client/src/beacon_chain_types.rs | 109 +++++++++++++ beacon_node/client/src/client_types.rs | 65 -------- beacon_node/client/src/lib.rs | 33 ++-- beacon_node/client/src/notifier.rs | 8 +- beacon_node/http_server/src/lib.rs | 27 +--- .../http_server/src/prometheus_handler.rs | 19 +-- beacon_node/network/src/beacon_chain.rs | 27 ++-- beacon_node/network/src/message_handler.rs | 13 +- beacon_node/network/src/service.rs | 12 +- beacon_node/network/src/sync/import_queue.rs | 12 +- beacon_node/network/src/sync/simple_sync.rs | 18 +-- beacon_node/rpc/src/attestation.rs | 10 +- beacon_node/rpc/src/beacon_block.rs | 10 +- beacon_node/rpc/src/beacon_chain.rs | 27 ++-- beacon_node/rpc/src/beacon_node.rs | 9 +- beacon_node/rpc/src/lib.rs | 7 +- beacon_node/rpc/src/validator.rs | 10 +- beacon_node/src/run.rs | 17 +- 24 files changed, 254 insertions(+), 452 deletions(-) delete mode 100644 beacon_node/beacon_chain/src/initialise.rs delete mode 100644 beacon_node/beacon_chain/src/test_utils/mod.rs delete mode 100644 beacon_node/beacon_chain/src/test_utils/testing_beacon_chain_builder.rs create mode 100644 beacon_node/client/src/beacon_chain_types.rs delete mode 100644 beacon_node/client/src/client_types.rs diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index f2c4b3dbea..9f08b6f644 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -79,32 +79,33 @@ impl BlockProcessingOutcome { } } -pub struct BeaconChain { - pub store: Arc, - pub slot_clock: U, - pub op_pool: OperationPool, - canonical_head: RwLock>, - finalized_head: RwLock>, - pub state: RwLock>, - pub spec: ChainSpec, - pub fork_choice: RwLock, +pub trait BeaconChainTypes { + type Store: store::Store; + type SlotClock: slot_clock::SlotClock; + type ForkChoice: fork_choice::ForkChoice; + type EthSpec: types::EthSpec; } -impl BeaconChain -where - T: Store, - U: SlotClock, - F: ForkChoice, - E: EthSpec, -{ +pub struct BeaconChain { + pub store: Arc, + pub slot_clock: T::SlotClock, + pub op_pool: OperationPool, + canonical_head: RwLock>, + finalized_head: RwLock>, + pub state: RwLock>, + pub spec: ChainSpec, + pub fork_choice: RwLock, +} + +impl BeaconChain { /// Instantiate a new Beacon Chain, from genesis. pub fn from_genesis( - store: Arc, - slot_clock: U, - mut genesis_state: BeaconState, + store: Arc, + slot_clock: T::SlotClock, + mut genesis_state: BeaconState, genesis_block: BeaconBlock, spec: ChainSpec, - fork_choice: F, + fork_choice: T::ForkChoice, ) -> Result { let state_root = genesis_state.canonical_root(); store.put(&state_root, &genesis_state)?; @@ -223,7 +224,7 @@ where Err(BeaconStateError::SlotOutOfBounds) => { // Read the earliest historic state in the current slot. let earliest_historic_slot = - state.slot - Slot::from(E::SlotsPerHistoricalRoot::to_usize()); + state.slot - Slot::from(T::EthSpec::slots_per_historical_root()); // Load the earlier state from disk. let new_state_root = state.get_state_root(earliest_historic_slot)?; @@ -263,7 +264,7 @@ where &self, new_beacon_block: BeaconBlock, new_beacon_block_root: Hash256, - new_beacon_state: BeaconState, + new_beacon_state: BeaconState, new_beacon_state_root: Hash256, ) { debug!( @@ -285,7 +286,7 @@ where /// It is important to note that the `beacon_state` returned may not match the present slot. It /// is the state as it was when the head block was received, which could be some slots prior to /// now. - pub fn head(&self) -> RwLockReadGuard> { + pub fn head(&self) -> RwLockReadGuard> { self.canonical_head.read() } @@ -295,7 +296,7 @@ where /// state and calling `catchup_state` as it will not result in an old state being installed and /// then having it iteratively updated -- in such a case it's possible for another thread to /// find the state at an old slot. - pub fn update_state(&self, mut state: BeaconState) -> Result<(), Error> { + pub fn update_state(&self, mut state: BeaconState) -> Result<(), Error> { let present_slot = match self.slot_clock.present_slot() { Ok(Some(slot)) => slot, _ => return Err(Error::UnableToReadSlot), @@ -350,7 +351,7 @@ where &self, new_beacon_block: BeaconBlock, new_beacon_block_root: Hash256, - new_beacon_state: BeaconState, + new_beacon_state: BeaconState, new_beacon_state_root: Hash256, ) { let mut finalized_head = self.finalized_head.write(); @@ -364,7 +365,7 @@ where /// Returns a read-lock guarded `CheckPoint` struct for reading the justified head (as chosen, /// indirectly, by the fork-choice rule). - pub fn finalized_head(&self) -> RwLockReadGuard> { + pub fn finalized_head(&self) -> RwLockReadGuard> { self.finalized_head.read() } @@ -602,7 +603,7 @@ where // significantly lower exposure surface to DoS attacks. // Transition the parent state to the block slot. - let mut state: BeaconState = parent_state; + let mut state: BeaconState = parent_state; for _ in state.slot.as_u64()..block.slot.as_u64() { if let Err(e) = per_slot_processing(&mut state, &self.spec) { return Ok(BlockProcessingOutcome::InvalidBlock( @@ -657,7 +658,7 @@ where pub fn produce_block( &self, randao_reveal: Signature, - ) -> Result<(BeaconBlock, BeaconState), BlockProductionError> { + ) -> Result<(BeaconBlock, BeaconState), BlockProductionError> { debug!("Producing block at slot {}...", self.state.read().slot); let mut state = self.state.read().clone(); @@ -728,7 +729,7 @@ where .ok_or_else(|| Error::MissingBeaconBlock(new_head))?; let block_root = block.canonical_root(); - let state: BeaconState = self + let state: BeaconState = self .store .get(&block.state_root)? .ok_or_else(|| Error::MissingBeaconState(block.state_root))?; @@ -752,7 +753,7 @@ where /// /// This could be a very expensive operation and should only be done in testing/analysis /// activities. - pub fn chain_dump(&self) -> Result>, Error> { + pub fn chain_dump(&self) -> Result>, Error> { let mut dump = vec![]; let mut last_slot = CheckPoint { diff --git a/beacon_node/beacon_chain/src/initialise.rs b/beacon_node/beacon_chain/src/initialise.rs deleted file mode 100644 index b9d950ed54..0000000000 --- a/beacon_node/beacon_chain/src/initialise.rs +++ /dev/null @@ -1,153 +0,0 @@ -// Initialisation functions to generate a new BeaconChain. -// Note: A new version of ClientTypes may need to be implemented for the lighthouse -// testnet. These are examples. Also. there is code duplication which can/should be cleaned up. - -use crate::BeaconChain; -use fork_choice::BitwiseLMDGhost; -use slot_clock::SystemTimeSlotClock; -use std::path::PathBuf; -use std::sync::Arc; -use store::{DiskStore, MemoryStore}; -use tree_hash::TreeHash; -use types::test_utils::TestingBeaconStateBuilder; -use types::{BeaconBlock, ChainSpec, FewValidatorsEthSpec, FoundationEthSpec, Hash256}; - -//TODO: Correct this for prod -//TODO: Account for historical db -pub fn initialise_beacon_chain( - spec: &ChainSpec, - db_name: Option<&PathBuf>, -) -> Arc< - BeaconChain< - DiskStore, - SystemTimeSlotClock, - BitwiseLMDGhost, - FoundationEthSpec, - >, -> { - let path = db_name.expect("db_name cannot be None."); - let store = DiskStore::open(path).expect("Unable to open DB."); - let store = Arc::new(store); - - let state_builder = TestingBeaconStateBuilder::from_default_keypairs_file_if_exists(8, &spec); - let (genesis_state, _keypairs) = state_builder.build(); - - let mut genesis_block = BeaconBlock::empty(&spec); - genesis_block.state_root = Hash256::from_slice(&genesis_state.tree_hash_root()); - - // Slot clock - let slot_clock = SystemTimeSlotClock::new( - spec.genesis_slot, - genesis_state.genesis_time, - spec.seconds_per_slot, - ) - .expect("Unable to load SystemTimeSlotClock"); - // Choose the fork choice - let fork_choice = BitwiseLMDGhost::new(store.clone()); - - // Genesis chain - //TODO: Handle error correctly - Arc::new( - BeaconChain::from_genesis( - store, - slot_clock, - genesis_state, - genesis_block, - spec.clone(), - fork_choice, - ) - .expect("Terminate if beacon chain generation fails"), - ) -} - -/// Initialisation of a test beacon chain, uses an in memory db with fixed genesis time. -pub fn initialise_test_beacon_chain_with_memory_db( - spec: &ChainSpec, - _db_name: Option<&PathBuf>, -) -> Arc< - BeaconChain< - MemoryStore, - SystemTimeSlotClock, - BitwiseLMDGhost, - FewValidatorsEthSpec, - >, -> { - let store = Arc::new(MemoryStore::open()); - - let state_builder = TestingBeaconStateBuilder::from_default_keypairs_file_if_exists(8, spec); - let (genesis_state, _keypairs) = state_builder.build(); - - let mut genesis_block = BeaconBlock::empty(spec); - genesis_block.state_root = Hash256::from_slice(&genesis_state.tree_hash_root()); - - // Slot clock - let slot_clock = SystemTimeSlotClock::new( - spec.genesis_slot, - genesis_state.genesis_time, - spec.seconds_per_slot, - ) - .expect("Unable to load SystemTimeSlotClock"); - // Choose the fork choice - let fork_choice = BitwiseLMDGhost::new(store.clone()); - - // Genesis chain - //TODO: Handle error correctly - Arc::new( - BeaconChain::from_genesis( - store, - slot_clock, - genesis_state, - genesis_block, - spec.clone(), - fork_choice, - ) - .expect("Terminate if beacon chain generation fails"), - ) -} - -/// Initialisation of a test beacon chain, uses an in memory db with fixed genesis time. -pub fn initialise_test_beacon_chain_with_disk_db( - spec: &ChainSpec, - db_name: Option<&PathBuf>, -) -> Arc< - BeaconChain< - DiskStore, - SystemTimeSlotClock, - BitwiseLMDGhost, - FewValidatorsEthSpec, - >, -> { - let path = db_name.expect("db_name cannot be None."); - let store = DiskStore::open(path).expect("Unable to open DB."); - let store = Arc::new(store); - - let state_builder = TestingBeaconStateBuilder::from_default_keypairs_file_if_exists(8, spec); - let (genesis_state, _keypairs) = state_builder.build(); - - let mut genesis_block = BeaconBlock::empty(spec); - genesis_block.state_root = Hash256::from_slice(&genesis_state.tree_hash_root()); - - // Slot clock - let slot_clock = SystemTimeSlotClock::new( - spec.genesis_slot, - genesis_state.genesis_time, - spec.seconds_per_slot, - ) - .expect("Unable to load SystemTimeSlotClock"); - // Choose the fork choice - let fork_choice = BitwiseLMDGhost::new(store.clone()); - - // Genesis chain - //TODO: Handle error correctly - Arc::new( - BeaconChain::from_genesis( - store, - slot_clock, - genesis_state, - genesis_block, - spec.clone(), - fork_choice, - ) - .expect("Terminate if beacon chain generation fails"), - ) -} diff --git a/beacon_node/beacon_chain/src/lib.rs b/beacon_node/beacon_chain/src/lib.rs index 6ac01a5d54..9f3058d0bc 100644 --- a/beacon_node/beacon_chain/src/lib.rs +++ b/beacon_node/beacon_chain/src/lib.rs @@ -1,10 +1,10 @@ mod beacon_chain; mod checkpoint; mod errors; -pub mod initialise; -pub mod test_utils; -pub use self::beacon_chain::{BeaconChain, BlockProcessingOutcome, InvalidBlock, ValidBlock}; +pub use self::beacon_chain::{ + BeaconChain, BeaconChainTypes, BlockProcessingOutcome, InvalidBlock, ValidBlock, +}; pub use self::checkpoint::CheckPoint; pub use self::errors::{BeaconChainError, BlockProductionError}; pub use fork_choice; diff --git a/beacon_node/beacon_chain/src/test_utils/mod.rs b/beacon_node/beacon_chain/src/test_utils/mod.rs deleted file mode 100644 index ad251a3c9e..0000000000 --- a/beacon_node/beacon_chain/src/test_utils/mod.rs +++ /dev/null @@ -1,3 +0,0 @@ -mod testing_beacon_chain_builder; - -pub use testing_beacon_chain_builder::TestingBeaconChainBuilder; diff --git a/beacon_node/beacon_chain/src/test_utils/testing_beacon_chain_builder.rs b/beacon_node/beacon_chain/src/test_utils/testing_beacon_chain_builder.rs deleted file mode 100644 index b6b1defcc2..0000000000 --- a/beacon_node/beacon_chain/src/test_utils/testing_beacon_chain_builder.rs +++ /dev/null @@ -1,49 +0,0 @@ -pub use crate::{BeaconChain, BeaconChainError, CheckPoint}; -use fork_choice::BitwiseLMDGhost; -use slot_clock::TestingSlotClock; -use std::sync::Arc; -use store::MemoryStore; -use tree_hash::TreeHash; -use types::*; -use types::{test_utils::TestingBeaconStateBuilder, EthSpec, FewValidatorsEthSpec}; - -type TestingBeaconChain = BeaconChain< - MemoryStore, - TestingSlotClock, - BitwiseLMDGhost, - E, ->; - -pub struct TestingBeaconChainBuilder { - state_builder: TestingBeaconStateBuilder, -} - -impl TestingBeaconChainBuilder { - pub fn build(self, spec: &ChainSpec) -> TestingBeaconChain { - let store = Arc::new(MemoryStore::open()); - let slot_clock = TestingSlotClock::new(spec.genesis_slot.as_u64()); - let fork_choice = BitwiseLMDGhost::new(store.clone()); - - let (genesis_state, _keypairs) = self.state_builder.build(); - - let mut genesis_block = BeaconBlock::empty(&spec); - genesis_block.state_root = Hash256::from_slice(&genesis_state.tree_hash_root()); - - // Create the Beacon Chain - BeaconChain::from_genesis( - store, - slot_clock, - genesis_state, - genesis_block, - spec.clone(), - fork_choice, - ) - .unwrap() - } -} - -impl From> for TestingBeaconChainBuilder { - fn from(state_builder: TestingBeaconStateBuilder) -> TestingBeaconChainBuilder { - TestingBeaconChainBuilder { state_builder } - } -} diff --git a/beacon_node/client/Cargo.toml b/beacon_node/client/Cargo.toml index 6634e260d5..387bf16757 100644 --- a/beacon_node/client/Cargo.toml +++ b/beacon_node/client/Cargo.toml @@ -12,6 +12,7 @@ http_server = { path = "../http_server" } rpc = { path = "../rpc" } fork_choice = { path = "../../eth2/fork_choice" } types = { path = "../../eth2/types" } +tree_hash = { path = "../../eth2/utils/tree_hash" } slot_clock = { path = "../../eth2/utils/slot_clock" } error-chain = "0.12.0" slog = "^2.2.3" diff --git a/beacon_node/client/src/beacon_chain_types.rs b/beacon_node/client/src/beacon_chain_types.rs new file mode 100644 index 0000000000..b8236c679f --- /dev/null +++ b/beacon_node/client/src/beacon_chain_types.rs @@ -0,0 +1,109 @@ +use crate::ClientConfig; +use beacon_chain::{ + fork_choice::BitwiseLMDGhost, + slot_clock::SystemTimeSlotClock, + store::{DiskStore, MemoryStore, Store}, + BeaconChain, BeaconChainTypes, +}; +use std::sync::Arc; +use tree_hash::TreeHash; +use types::{ + test_utils::TestingBeaconStateBuilder, BeaconBlock, EthSpec, FewValidatorsEthSpec, Hash256, +}; + +/// Provides a new, initialized `BeaconChain` +pub trait InitialiseBeaconChain { + fn initialise_beacon_chain(config: &ClientConfig) -> BeaconChain; +} + +/// A testnet-suitable BeaconChainType, using `MemoryStore`. +#[derive(Clone)] +pub struct TestnetMemoryBeaconChainTypes; + +impl BeaconChainTypes for TestnetMemoryBeaconChainTypes { + type Store = MemoryStore; + type SlotClock = SystemTimeSlotClock; + type ForkChoice = BitwiseLMDGhost; + type EthSpec = FewValidatorsEthSpec; +} + +impl InitialiseBeaconChain for TestnetMemoryBeaconChainTypes +where + T: BeaconChainTypes< + Store = MemoryStore, + SlotClock = SystemTimeSlotClock, + ForkChoice = BitwiseLMDGhost, + >, +{ + fn initialise_beacon_chain(_config: &ClientConfig) -> BeaconChain { + initialize_chain(MemoryStore::open()) + } +} + +/// A testnet-suitable BeaconChainType, using `DiskStore`. +#[derive(Clone)] +pub struct TestnetDiskBeaconChainTypes; + +impl BeaconChainTypes for TestnetDiskBeaconChainTypes { + type Store = DiskStore; + type SlotClock = SystemTimeSlotClock; + type ForkChoice = BitwiseLMDGhost; + type EthSpec = FewValidatorsEthSpec; +} + +impl InitialiseBeaconChain for TestnetDiskBeaconChainTypes +where + T: BeaconChainTypes< + Store = DiskStore, + SlotClock = SystemTimeSlotClock, + ForkChoice = BitwiseLMDGhost, + >, +{ + fn initialise_beacon_chain(config: &ClientConfig) -> BeaconChain { + let store = DiskStore::open(&config.db_name).expect("Unable to open DB."); + + initialize_chain(store) + } +} + +/// Produces a `BeaconChain` given some pre-initialized `Store`. +fn initialize_chain(store: U) -> BeaconChain +where + T: BeaconChainTypes< + Store = U, + SlotClock = SystemTimeSlotClock, + ForkChoice = BitwiseLMDGhost, + >, +{ + let spec = T::EthSpec::spec(); + + let store = Arc::new(store); + + let state_builder = TestingBeaconStateBuilder::from_default_keypairs_file_if_exists(8, &spec); + let (genesis_state, _keypairs) = state_builder.build(); + + let mut genesis_block = BeaconBlock::empty(&spec); + genesis_block.state_root = Hash256::from_slice(&genesis_state.tree_hash_root()); + + // Slot clock + let slot_clock = SystemTimeSlotClock::new( + spec.genesis_slot, + genesis_state.genesis_time, + spec.seconds_per_slot, + ) + .expect("Unable to load SystemTimeSlotClock"); + // Choose the fork choice + let fork_choice = BitwiseLMDGhost::new(store.clone()); + + // Genesis chain + //TODO: Handle error correctly + BeaconChain::from_genesis( + store, + slot_clock, + genesis_state, + genesis_block, + spec.clone(), + fork_choice, + ) + .expect("Terminate if beacon chain generation fails") +} diff --git a/beacon_node/client/src/client_types.rs b/beacon_node/client/src/client_types.rs deleted file mode 100644 index 4cce42a064..0000000000 --- a/beacon_node/client/src/client_types.rs +++ /dev/null @@ -1,65 +0,0 @@ -use crate::{ArcBeaconChain, ClientConfig}; -use beacon_chain::{ - fork_choice::BitwiseLMDGhost, - initialise, - slot_clock::{SlotClock, SystemTimeSlotClock}, - store::{DiskStore, MemoryStore, Store}, -}; -use fork_choice::ForkChoice; -use types::{EthSpec, FewValidatorsEthSpec, FoundationEthSpec}; - -pub trait ClientTypes { - type DB: Store + 'static; - type SlotClock: SlotClock + 'static; - type ForkChoice: ForkChoice + 'static; - type EthSpec: EthSpec + 'static; - - fn initialise_beacon_chain( - config: &ClientConfig, - ) -> ArcBeaconChain; -} - -pub struct StandardClientType; - -impl ClientTypes for StandardClientType { - type DB = DiskStore; - type SlotClock = SystemTimeSlotClock; - type ForkChoice = BitwiseLMDGhost; - type EthSpec = FoundationEthSpec; - - fn initialise_beacon_chain( - config: &ClientConfig, - ) -> ArcBeaconChain { - initialise::initialise_beacon_chain(&config.spec, Some(&config.db_name)) - } -} - -pub struct MemoryStoreTestingClientType; - -impl ClientTypes for MemoryStoreTestingClientType { - type DB = MemoryStore; - type SlotClock = SystemTimeSlotClock; - type ForkChoice = BitwiseLMDGhost; - type EthSpec = FewValidatorsEthSpec; - - fn initialise_beacon_chain( - config: &ClientConfig, - ) -> ArcBeaconChain { - initialise::initialise_test_beacon_chain_with_memory_db(&config.spec, None) - } -} - -pub struct DiskStoreTestingClientType; - -impl ClientTypes for DiskStoreTestingClientType { - type DB = DiskStore; - type SlotClock = SystemTimeSlotClock; - type ForkChoice = BitwiseLMDGhost; - type EthSpec = FewValidatorsEthSpec; - - fn initialise_beacon_chain( - config: &ClientConfig, - ) -> ArcBeaconChain { - initialise::initialise_test_beacon_chain_with_disk_db(&config.spec, Some(&config.db_name)) - } -} diff --git a/beacon_node/client/src/lib.rs b/beacon_node/client/src/lib.rs index 9445799d57..40be9b7b8e 100644 --- a/beacon_node/client/src/lib.rs +++ b/beacon_node/client/src/lib.rs @@ -1,15 +1,13 @@ extern crate slog; +mod beacon_chain_types; mod client_config; -pub mod client_types; pub mod error; pub mod notifier; use beacon_chain::BeaconChain; -pub use client_config::{ClientConfig, DBType}; -pub use client_types::ClientTypes; +use beacon_chain_types::InitialiseBeaconChain; use exit_future::Signal; -use fork_choice::ForkChoice; use futures::{future::Future, Stream}; use network::Service as NetworkService; use slog::{error, info, o}; @@ -17,22 +15,22 @@ use slot_clock::SlotClock; use std::marker::PhantomData; use std::sync::Arc; use std::time::{Duration, Instant}; -use store::Store; use tokio::runtime::TaskExecutor; use tokio::timer::Interval; -use types::EthSpec; -type ArcBeaconChain = Arc>; +pub use beacon_chain::BeaconChainTypes; +pub use beacon_chain_types::{TestnetDiskBeaconChainTypes, TestnetMemoryBeaconChainTypes}; +pub use client_config::{ClientConfig, DBType}; /// Main beacon node client service. This provides the connection and initialisation of the clients /// sub-services in multiple threads. -pub struct Client { +pub struct Client { /// Configuration for the lighthouse client. _config: ClientConfig, /// The beacon chain for the running client. - _beacon_chain: ArcBeaconChain, + _beacon_chain: Arc>, /// Reference to the network service. - pub network: Arc>, + pub network: Arc>, /// Signal to terminate the RPC server. pub rpc_exit_signal: Option, /// Signal to terminate the HTTP server. @@ -45,7 +43,10 @@ pub struct Client { phantom: PhantomData, } -impl Client { +impl Client +where + T: BeaconChainTypes + InitialiseBeaconChain + Clone + 'static, +{ /// Generate an instance of the client. Spawn and link all internal sub-processes. pub fn new( config: ClientConfig, @@ -53,7 +54,7 @@ impl Client { executor: &TaskExecutor, ) -> error::Result { // generate a beacon chain - let beacon_chain = TClientType::initialise_beacon_chain(&config); + let beacon_chain = Arc::new(T::initialise_beacon_chain(&config)); if beacon_chain.read_slot_clock().is_none() { panic!("Cannot start client before genesis!") @@ -158,13 +159,7 @@ impl Client { } } -fn do_state_catchup(chain: &Arc>, log: &slog::Logger) -where - T: Store, - U: SlotClock, - F: ForkChoice, - E: EthSpec, -{ +fn do_state_catchup(chain: &Arc>, log: &slog::Logger) { if let Some(genesis_height) = chain.slots_since_genesis() { let result = chain.catchup_state(); diff --git a/beacon_node/client/src/notifier.rs b/beacon_node/client/src/notifier.rs index aa1e43c3cd..977342b1a1 100644 --- a/beacon_node/client/src/notifier.rs +++ b/beacon_node/client/src/notifier.rs @@ -1,5 +1,5 @@ use crate::Client; -use crate::ClientTypes; +use beacon_chain::BeaconChainTypes; use exit_future::Exit; use futures::{Future, Stream}; use slog::{debug, o}; @@ -10,7 +10,11 @@ use tokio::timer::Interval; /// Thread that monitors the client and reports useful statistics to the user. -pub fn run(client: &Client, executor: TaskExecutor, exit: Exit) { +pub fn run( + client: &Client, + executor: TaskExecutor, + exit: Exit, +) { // notification heartbeat let interval = Interval::new(Instant::now(), Duration::from_secs(5)); diff --git a/beacon_node/http_server/src/lib.rs b/beacon_node/http_server/src/lib.rs index b2c3a86fc3..13754980d0 100644 --- a/beacon_node/http_server/src/lib.rs +++ b/beacon_node/http_server/src/lib.rs @@ -1,6 +1,6 @@ mod prometheus_handler; -use beacon_chain::BeaconChain; +use beacon_chain::{BeaconChain, BeaconChainTypes}; use futures::Future; use iron::prelude::*; use iron::{status::Status, Handler, IronResult, Request, Response}; @@ -10,7 +10,6 @@ use router::Router; use slog::{info, o, warn}; use std::sync::Arc; use tokio::runtime::TaskExecutor; -use types::EthSpec; #[derive(PartialEq, Clone, Debug)] pub struct HttpServerConfig { @@ -45,15 +44,9 @@ impl Handler for IndexHandler { } } -pub fn create_iron_http_server( - beacon_chain: Arc>, -) -> Iron -where - T: store::Store + 'static, - U: slot_clock::SlotClock + 'static, - F: fork_choice::ForkChoice + 'static, - E: EthSpec + 'static, -{ +pub fn create_iron_http_server( + beacon_chain: Arc>, +) -> Iron { let index_handler = IndexHandler { message: "Hello world".to_string(), }; @@ -67,19 +60,13 @@ where Iron::new(router) } -pub fn start_service( +pub fn start_service( config: &HttpServerConfig, executor: &TaskExecutor, _network_chan: crossbeam_channel::Sender, - beacon_chain: Arc>, + beacon_chain: Arc>, log: &slog::Logger, -) -> exit_future::Signal -where - T: store::Store + 'static, - U: slot_clock::SlotClock + 'static, - F: fork_choice::ForkChoice + 'static, - E: EthSpec + 'static, -{ +) -> exit_future::Signal { let log = log.new(o!("Service"=>"HTTP")); // Create: diff --git a/beacon_node/http_server/src/prometheus_handler.rs b/beacon_node/http_server/src/prometheus_handler.rs index ae84d48713..60f56084cd 100644 --- a/beacon_node/http_server/src/prometheus_handler.rs +++ b/beacon_node/http_server/src/prometheus_handler.rs @@ -1,22 +1,17 @@ -use beacon_chain::BeaconChain; +use beacon_chain::{BeaconChain, BeaconChainTypes}; use iron::{status::Status, Handler, IronResult, Request, Response}; use prometheus::{Encoder, IntCounter, Opts, Registry, TextEncoder}; +use slot_clock::SlotClock; use std::sync::Arc; -use types::{EthSpec, Slot}; +use types::Slot; -pub struct PrometheusHandler { - pub beacon_chain: Arc>, +pub struct PrometheusHandler { + pub beacon_chain: Arc>, } -impl PrometheusHandler where E: EthSpec {} +impl PrometheusHandler {} -impl Handler for PrometheusHandler -where - E: EthSpec + 'static, - U: slot_clock::SlotClock + Send + Sync + 'static, - T: Send + Sync + 'static, - F: Send + Sync + 'static, -{ +impl Handler for PrometheusHandler { fn handle(&self, _: &mut Request) -> IronResult { let r = Registry::new(); diff --git a/beacon_node/network/src/beacon_chain.rs b/beacon_node/network/src/beacon_chain.rs index 2a42376f7c..6324e3a940 100644 --- a/beacon_node/network/src/beacon_chain.rs +++ b/beacon_node/network/src/beacon_chain.rs @@ -1,9 +1,6 @@ use beacon_chain::BeaconChain as RawBeaconChain; use beacon_chain::{ - fork_choice::ForkChoice, parking_lot::RwLockReadGuard, - slot_clock::SlotClock, - store::Store, types::{BeaconState, ChainSpec}, AttestationValidationError, CheckPoint, }; @@ -12,17 +9,17 @@ use types::{ Attestation, BeaconBlock, BeaconBlockBody, BeaconBlockHeader, Epoch, EthSpec, Hash256, Slot, }; -pub use beacon_chain::{BeaconChainError, BlockProcessingOutcome, InvalidBlock}; +pub use beacon_chain::{BeaconChainError, BeaconChainTypes, BlockProcessingOutcome, InvalidBlock}; /// The network's API to the beacon chain. -pub trait BeaconChain: Send + Sync { +pub trait BeaconChain: Send + Sync { fn get_spec(&self) -> &ChainSpec; - fn get_state(&self) -> RwLockReadGuard>; + fn get_state(&self) -> RwLockReadGuard>; fn slot(&self) -> Slot; - fn head(&self) -> RwLockReadGuard>; + fn head(&self) -> RwLockReadGuard>; fn get_block(&self, block_root: &Hash256) -> Result, BeaconChainError>; @@ -30,7 +27,7 @@ pub trait BeaconChain: Send + Sync { fn best_block_root(&self) -> Hash256; - fn finalized_head(&self) -> RwLockReadGuard>; + fn finalized_head(&self) -> RwLockReadGuard>; fn finalized_epoch(&self) -> Epoch; @@ -64,18 +61,12 @@ pub trait BeaconChain: Send + Sync { fn is_new_block_root(&self, beacon_block_root: &Hash256) -> Result; } -impl BeaconChain for RawBeaconChain -where - T: Store, - U: SlotClock, - F: ForkChoice, - E: EthSpec, -{ +impl BeaconChain for RawBeaconChain { fn get_spec(&self) -> &ChainSpec { &self.spec } - fn get_state(&self) -> RwLockReadGuard> { + fn get_state(&self) -> RwLockReadGuard> { self.state.read() } @@ -83,7 +74,7 @@ where self.get_state().slot } - fn head(&self) -> RwLockReadGuard> { + fn head(&self) -> RwLockReadGuard> { self.head() } @@ -95,7 +86,7 @@ where self.get_state().finalized_epoch } - fn finalized_head(&self) -> RwLockReadGuard> { + fn finalized_head(&self) -> RwLockReadGuard> { self.finalized_head() } diff --git a/beacon_node/network/src/message_handler.rs b/beacon_node/network/src/message_handler.rs index a7d0ff2a13..f6a27ad600 100644 --- a/beacon_node/network/src/message_handler.rs +++ b/beacon_node/network/src/message_handler.rs @@ -1,4 +1,4 @@ -use crate::beacon_chain::BeaconChain; +use crate::beacon_chain::{BeaconChain, BeaconChainTypes}; use crate::error; use crate::service::{NetworkMessage, OutgoingMessage}; use crate::sync::SimpleSync; @@ -13,7 +13,6 @@ use slog::{debug, warn}; use std::collections::HashMap; use std::sync::Arc; use std::time::Instant; -use types::EthSpec; /// Timeout for RPC requests. // const REQUEST_TIMEOUT: Duration = Duration::from_secs(30); @@ -21,11 +20,11 @@ use types::EthSpec; // const HELLO_TIMEOUT: Duration = Duration::from_secs(30); /// Handles messages received from the network and client and organises syncing. -pub struct MessageHandler { +pub struct MessageHandler { /// Currently loaded and initialised beacon chain. - _chain: Arc>, + _chain: Arc>, /// The syncing framework. - sync: SimpleSync, + sync: SimpleSync, /// The context required to send messages to, and process messages from peers. network_context: NetworkContext, /// The `MessageHandler` logger. @@ -45,10 +44,10 @@ pub enum HandlerMessage { PubsubMessage(PeerId, Box), } -impl MessageHandler { +impl MessageHandler { /// Initializes and runs the MessageHandler. pub fn spawn( - beacon_chain: Arc>, + beacon_chain: Arc>, network_send: crossbeam_channel::Sender, executor: &tokio::runtime::TaskExecutor, log: slog::Logger, diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index 50454a8759..d87b9e5a9d 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -1,4 +1,4 @@ -use crate::beacon_chain::BeaconChain; +use crate::beacon_chain::{BeaconChain, BeaconChainTypes}; use crate::error; use crate::message_handler::{HandlerMessage, MessageHandler}; use crate::NetworkConfig; @@ -13,20 +13,20 @@ use slog::{debug, info, o, trace}; use std::marker::PhantomData; use std::sync::Arc; use tokio::runtime::TaskExecutor; -use types::{EthSpec, Topic}; +use types::Topic; /// Service that handles communication between internal services and the eth2_libp2p network service. -pub struct Service { +pub struct Service { //libp2p_service: Arc>, _libp2p_exit: oneshot::Sender<()>, network_send: crossbeam_channel::Sender, - _phantom: PhantomData, //message_handler: MessageHandler, + _phantom: PhantomData, //message_handler: MessageHandler, //message_handler_send: Sender } -impl Service { +impl Service { pub fn new( - beacon_chain: Arc>, + beacon_chain: Arc>, config: &NetworkConfig, executor: &TaskExecutor, log: slog::Logger, diff --git a/beacon_node/network/src/sync/import_queue.rs b/beacon_node/network/src/sync/import_queue.rs index 6c2fc33eeb..793f4c395e 100644 --- a/beacon_node/network/src/sync/import_queue.rs +++ b/beacon_node/network/src/sync/import_queue.rs @@ -1,11 +1,11 @@ -use crate::beacon_chain::BeaconChain; +use crate::beacon_chain::{BeaconChain, BeaconChainTypes}; use eth2_libp2p::rpc::methods::*; use eth2_libp2p::PeerId; use slog::{debug, error}; use std::sync::Arc; use std::time::{Duration, Instant}; use tree_hash::TreeHash; -use types::{BeaconBlock, BeaconBlockBody, BeaconBlockHeader, EthSpec, Hash256, Slot}; +use types::{BeaconBlock, BeaconBlockBody, BeaconBlockHeader, Hash256, Slot}; /// Provides a queue for fully and partially built `BeaconBlock`s. /// @@ -19,8 +19,8 @@ use types::{BeaconBlock, BeaconBlockBody, BeaconBlockHeader, EthSpec, Hash256, S /// `BeaconBlockBody` as the key. /// - It is possible for multiple distinct blocks to have identical `BeaconBlockBodies`. Therefore /// we cannot use a `HashMap` keyed by the root of `BeaconBlockBody`. -pub struct ImportQueue { - pub chain: Arc>, +pub struct ImportQueue { + pub chain: Arc>, /// Partially imported blocks, keyed by the root of `BeaconBlockBody`. pub partials: Vec, /// Time before a queue entry is considered state. @@ -29,9 +29,9 @@ pub struct ImportQueue { log: slog::Logger, } -impl ImportQueue { +impl ImportQueue { /// Return a new, empty queue. - pub fn new(chain: Arc>, stale_time: Duration, log: slog::Logger) -> Self { + pub fn new(chain: Arc>, stale_time: Duration, log: slog::Logger) -> Self { Self { chain, partials: vec![], diff --git a/beacon_node/network/src/sync/simple_sync.rs b/beacon_node/network/src/sync/simple_sync.rs index d44ffd4b75..6ab8ea7d9a 100644 --- a/beacon_node/network/src/sync/simple_sync.rs +++ b/beacon_node/network/src/sync/simple_sync.rs @@ -1,5 +1,5 @@ use super::import_queue::ImportQueue; -use crate::beacon_chain::{BeaconChain, BlockProcessingOutcome, InvalidBlock}; +use crate::beacon_chain::{BeaconChain, BeaconChainTypes, BlockProcessingOutcome, InvalidBlock}; use crate::message_handler::NetworkContext; use eth2_libp2p::rpc::methods::*; use eth2_libp2p::rpc::{RPCRequest, RPCResponse, RequestId}; @@ -9,7 +9,7 @@ use std::collections::HashMap; use std::sync::Arc; use std::time::Duration; use tree_hash::TreeHash; -use types::{Attestation, BeaconBlock, Epoch, EthSpec, Hash256, Slot}; +use types::{Attestation, BeaconBlock, Epoch, Hash256, Slot}; /// The number of slots that we can import blocks ahead of us, before going into full Sync mode. const SLOT_IMPORT_TOLERANCE: u64 = 100; @@ -88,8 +88,8 @@ impl From for PeerSyncInfo { } } -impl From<&Arc>> for PeerSyncInfo { - fn from(chain: &Arc>) -> PeerSyncInfo { +impl From<&Arc>> for PeerSyncInfo { + fn from(chain: &Arc>) -> PeerSyncInfo { Self::from(chain.hello_message()) } } @@ -103,22 +103,22 @@ pub enum SyncState { } /// Simple Syncing protocol. -pub struct SimpleSync { +pub struct SimpleSync { /// A reference to the underlying beacon chain. - chain: Arc>, + chain: Arc>, /// A mapping of Peers to their respective PeerSyncInfo. known_peers: HashMap, /// A queue to allow importing of blocks - import_queue: ImportQueue, + import_queue: ImportQueue, /// The current state of the syncing protocol. state: SyncState, /// Sync logger. log: slog::Logger, } -impl SimpleSync { +impl SimpleSync { /// Instantiate a `SimpleSync` instance, with no peers and an empty queue. - pub fn new(beacon_chain: Arc>, log: &slog::Logger) -> Self { + pub fn new(beacon_chain: Arc>, log: &slog::Logger) -> Self { let sync_logger = log.new(o!("Service"=> "Sync")); let queue_item_stale_time = Duration::from_secs(QUEUE_STALE_SECS); diff --git a/beacon_node/rpc/src/attestation.rs b/beacon_node/rpc/src/attestation.rs index e22715b555..6048e42b1b 100644 --- a/beacon_node/rpc/src/attestation.rs +++ b/beacon_node/rpc/src/attestation.rs @@ -1,4 +1,4 @@ -use crate::beacon_chain::BeaconChain; +use crate::beacon_chain::{BeaconChain, BeaconChainTypes}; use futures::Future; use grpcio::{RpcContext, RpcStatus, RpcStatusCode, UnarySink}; use protos::services::{ @@ -9,15 +9,15 @@ use protos::services_grpc::AttestationService; use slog::{error, info, trace, warn}; use ssz::{ssz_encode, Decode}; use std::sync::Arc; -use types::{Attestation, EthSpec}; +use types::Attestation; #[derive(Clone)] -pub struct AttestationServiceInstance { - pub chain: Arc>, +pub struct AttestationServiceInstance { + pub chain: Arc>, pub log: slog::Logger, } -impl AttestationService for AttestationServiceInstance { +impl AttestationService for AttestationServiceInstance { /// Produce the `AttestationData` for signing by a validator. fn produce_attestation_data( &mut self, diff --git a/beacon_node/rpc/src/beacon_block.rs b/beacon_node/rpc/src/beacon_block.rs index bbe6a8ee28..e553b79e7f 100644 --- a/beacon_node/rpc/src/beacon_block.rs +++ b/beacon_node/rpc/src/beacon_block.rs @@ -1,4 +1,4 @@ -use crate::beacon_chain::BeaconChain; +use crate::beacon_chain::{BeaconChain, BeaconChainTypes}; use crossbeam_channel; use eth2_libp2p::PubsubMessage; use futures::Future; @@ -13,16 +13,16 @@ use slog::Logger; use slog::{error, info, trace, warn}; use ssz::{ssz_encode, Decode}; use std::sync::Arc; -use types::{BeaconBlock, EthSpec, Signature, Slot}; +use types::{BeaconBlock, Signature, Slot}; #[derive(Clone)] -pub struct BeaconBlockServiceInstance { - pub chain: Arc>, +pub struct BeaconBlockServiceInstance { + pub chain: Arc>, pub network_chan: crossbeam_channel::Sender, pub log: Logger, } -impl BeaconBlockService for BeaconBlockServiceInstance { +impl BeaconBlockService for BeaconBlockServiceInstance { /// Produce a `BeaconBlock` for signing by a validator. fn produce_beacon_block( &mut self, diff --git a/beacon_node/rpc/src/beacon_chain.rs b/beacon_node/rpc/src/beacon_chain.rs index d12baf1d13..b0a490137c 100644 --- a/beacon_node/rpc/src/beacon_chain.rs +++ b/beacon_node/rpc/src/beacon_chain.rs @@ -1,22 +1,19 @@ use beacon_chain::BeaconChain as RawBeaconChain; use beacon_chain::{ - fork_choice::ForkChoice, parking_lot::{RwLockReadGuard, RwLockWriteGuard}, - slot_clock::SlotClock, - store::Store, types::{BeaconState, ChainSpec, Signature}, AttestationValidationError, BlockProductionError, }; -pub use beacon_chain::{BeaconChainError, BlockProcessingOutcome}; +pub use beacon_chain::{BeaconChainError, BeaconChainTypes, BlockProcessingOutcome}; use types::{Attestation, AttestationData, BeaconBlock, EthSpec}; /// The RPC's API to the beacon chain. -pub trait BeaconChain: Send + Sync { +pub trait BeaconChain: Send + Sync { fn get_spec(&self) -> &ChainSpec; - fn get_state(&self) -> RwLockReadGuard>; + fn get_state(&self) -> RwLockReadGuard>; - fn get_mut_state(&self) -> RwLockWriteGuard>; + fn get_mut_state(&self) -> RwLockWriteGuard>; fn process_block(&self, block: BeaconBlock) -> Result; @@ -24,7 +21,7 @@ pub trait BeaconChain: Send + Sync { fn produce_block( &self, randao_reveal: Signature, - ) -> Result<(BeaconBlock, BeaconState), BlockProductionError>; + ) -> Result<(BeaconBlock, BeaconState), BlockProductionError>; fn produce_attestation_data(&self, shard: u64) -> Result; @@ -34,22 +31,16 @@ pub trait BeaconChain: Send + Sync { ) -> Result<(), AttestationValidationError>; } -impl BeaconChain for RawBeaconChain -where - T: Store, - U: SlotClock, - F: ForkChoice, - E: EthSpec, -{ +impl BeaconChain for RawBeaconChain { fn get_spec(&self) -> &ChainSpec { &self.spec } - fn get_state(&self) -> RwLockReadGuard> { + fn get_state(&self) -> RwLockReadGuard> { self.state.read() } - fn get_mut_state(&self) -> RwLockWriteGuard> { + fn get_mut_state(&self) -> RwLockWriteGuard> { self.state.write() } @@ -63,7 +54,7 @@ where fn produce_block( &self, randao_reveal: Signature, - ) -> Result<(BeaconBlock, BeaconState), BlockProductionError> { + ) -> Result<(BeaconBlock, BeaconState), BlockProductionError> { self.produce_block(randao_reveal) } diff --git a/beacon_node/rpc/src/beacon_node.rs b/beacon_node/rpc/src/beacon_node.rs index 2ca39ae512..a923bbb356 100644 --- a/beacon_node/rpc/src/beacon_node.rs +++ b/beacon_node/rpc/src/beacon_node.rs @@ -1,19 +1,18 @@ -use crate::beacon_chain::BeaconChain; +use crate::beacon_chain::{BeaconChain, BeaconChainTypes}; use futures::Future; use grpcio::{RpcContext, UnarySink}; use protos::services::{Empty, Fork, NodeInfoResponse}; use protos::services_grpc::BeaconNodeService; use slog::{trace, warn}; use std::sync::Arc; -use types::EthSpec; #[derive(Clone)] -pub struct BeaconNodeServiceInstance { - pub chain: Arc>, +pub struct BeaconNodeServiceInstance { + pub chain: Arc>, pub log: slog::Logger, } -impl BeaconNodeService for BeaconNodeServiceInstance { +impl BeaconNodeService for BeaconNodeServiceInstance { /// Provides basic node information. fn info(&mut self, ctx: RpcContext, _req: Empty, sink: UnarySink) { trace!(self.log, "Node info requested via RPC"); diff --git a/beacon_node/rpc/src/lib.rs b/beacon_node/rpc/src/lib.rs index f1d5e9c886..9646135b68 100644 --- a/beacon_node/rpc/src/lib.rs +++ b/beacon_node/rpc/src/lib.rs @@ -7,7 +7,7 @@ mod validator; use self::attestation::AttestationServiceInstance; use self::beacon_block::BeaconBlockServiceInstance; -use self::beacon_chain::BeaconChain; +use self::beacon_chain::{BeaconChain, BeaconChainTypes}; use self::beacon_node::BeaconNodeServiceInstance; use self::validator::ValidatorServiceInstance; pub use config::Config as RPCConfig; @@ -21,13 +21,12 @@ use protos::services_grpc::{ use slog::{info, o, warn}; use std::sync::Arc; use tokio::runtime::TaskExecutor; -use types::EthSpec; -pub fn start_server( +pub fn start_server( config: &RPCConfig, executor: &TaskExecutor, network_chan: crossbeam_channel::Sender, - beacon_chain: Arc>, + beacon_chain: Arc>, log: &slog::Logger, ) -> exit_future::Signal { let log = log.new(o!("Service"=>"RPC")); diff --git a/beacon_node/rpc/src/validator.rs b/beacon_node/rpc/src/validator.rs index 34fbba5c49..e58c202d66 100644 --- a/beacon_node/rpc/src/validator.rs +++ b/beacon_node/rpc/src/validator.rs @@ -1,4 +1,4 @@ -use crate::beacon_chain::BeaconChain; +use crate::beacon_chain::{BeaconChain, BeaconChainTypes}; use bls::PublicKey; use futures::Future; use grpcio::{RpcContext, RpcStatus, RpcStatusCode, UnarySink}; @@ -7,16 +7,16 @@ use protos::services_grpc::ValidatorService; use slog::{trace, warn}; use ssz::Decode; use std::sync::Arc; -use types::{Epoch, EthSpec, RelativeEpoch}; +use types::{Epoch, RelativeEpoch}; #[derive(Clone)] -pub struct ValidatorServiceInstance { - pub chain: Arc>, +pub struct ValidatorServiceInstance { + pub chain: Arc>, pub log: slog::Logger, } //TODO: Refactor Errors -impl ValidatorService for ValidatorServiceInstance { +impl ValidatorService for ValidatorServiceInstance { /// For a list of validator public keys, this function returns the slot at which each /// validator must propose a block, attest to a shard, their shard committee and the shard they /// need to attest to. diff --git a/beacon_node/src/run.rs b/beacon_node/src/run.rs index 4cf930060f..6ec65a92d4 100644 --- a/beacon_node/src/run.rs +++ b/beacon_node/src/run.rs @@ -1,6 +1,7 @@ -use client::client_types::{DiskStoreTestingClientType, MemoryStoreTestingClientType}; -use client::{error, DBType}; -use client::{notifier, Client, ClientConfig, ClientTypes}; +use client::{ + error, notifier, BeaconChainTypes, Client, ClientConfig, DBType, TestnetDiskBeaconChainTypes, + TestnetMemoryBeaconChainTypes, +}; use futures::sync::oneshot; use futures::Future; use slog::info; @@ -29,9 +30,9 @@ pub fn run_beacon_node(config: ClientConfig, log: &slog::Logger) -> error::Resul info!( log, "BeaconNode starting"; - "type" => "DiskStoreTestingClientType" + "type" => "TestnetDiskBeaconChainTypes" ); - let client: Client = + let client: Client = Client::new(config, log.clone(), &executor)?; run(client, executor, runtime, log) @@ -40,9 +41,9 @@ pub fn run_beacon_node(config: ClientConfig, log: &slog::Logger) -> error::Resul info!( log, "BeaconNode starting"; - "type" => "MemoryStoreTestingClientType" + "type" => "TestnetMemoryBeaconChainTypes" ); - let client: Client = + let client: Client = Client::new(config, log.clone(), &executor)?; run(client, executor, runtime, log) @@ -50,7 +51,7 @@ pub fn run_beacon_node(config: ClientConfig, log: &slog::Logger) -> error::Resul } } -pub fn run( +pub fn run( client: Client, executor: TaskExecutor, mut runtime: Runtime, From 855222fa28b291c9b0e3335e33c05f9b2dfd5f58 Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Sun, 26 May 2019 15:59:52 +1000 Subject: [PATCH 06/10] Rename prom HTTP endpoint --- beacon_node/http_server/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/beacon_node/http_server/src/lib.rs b/beacon_node/http_server/src/lib.rs index 13754980d0..fe7b9b1b32 100644 --- a/beacon_node/http_server/src/lib.rs +++ b/beacon_node/http_server/src/lib.rs @@ -56,7 +56,7 @@ pub fn create_iron_http_server( let mut router = Router::new(); router.get("/", index_handler, "index"); - router.get("/prometheus/", prom_handler, "prometheus"); + router.get("/metrics", prom_handler, "metrics"); Iron::new(router) } From 705edf0e455b2d43c1151a4046a6ac7607893f8e Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Mon, 27 May 2019 09:01:50 +1000 Subject: [PATCH 07/10] Remove commented-out code --- beacon_node/http_server/src/lib.rs | 8 -------- 1 file changed, 8 deletions(-) diff --git a/beacon_node/http_server/src/lib.rs b/beacon_node/http_server/src/lib.rs index fe7b9b1b32..6bc69e72d1 100644 --- a/beacon_node/http_server/src/lib.rs +++ b/beacon_node/http_server/src/lib.rs @@ -15,10 +15,6 @@ use tokio::runtime::TaskExecutor; pub struct HttpServerConfig { pub enabled: bool, pub listen_address: String, - /* - pub listen_address: Ipv4Addr, - pub port: u16, - */ } impl Default for HttpServerConfig { @@ -26,10 +22,6 @@ impl Default for HttpServerConfig { Self { enabled: false, listen_address: "127.0.0.1:5051".to_string(), - /* - listen_address: Ipv4Addr::new(127, 0, 0, 1), - port: 5051, - */ } } } From 255590ef3bca45d8d2306b707a22c58d01858c63 Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Mon, 27 May 2019 11:34:22 +1000 Subject: [PATCH 08/10] Add `node/fork` endpoint to HTTP API, tidy --- beacon_node/http_server/Cargo.toml | 4 ++ beacon_node/http_server/src/api.rs | 69 +++++++++++++++++++ beacon_node/http_server/src/key.rs | 12 ++++ beacon_node/http_server/src/lib.rs | 38 +++++----- beacon_node/http_server/src/metrics.rs | 57 +++++++++++++++ .../http_server/src/prometheus_handler.rs | 47 ------------- 6 files changed, 158 insertions(+), 69 deletions(-) create mode 100644 beacon_node/http_server/src/api.rs create mode 100644 beacon_node/http_server/src/key.rs create mode 100644 beacon_node/http_server/src/metrics.rs delete mode 100644 beacon_node/http_server/src/prometheus_handler.rs diff --git a/beacon_node/http_server/Cargo.toml b/beacon_node/http_server/Cargo.toml index 6f4579d175..fb8bf9f4b7 100644 --- a/beacon_node/http_server/Cargo.toml +++ b/beacon_node/http_server/Cargo.toml @@ -18,12 +18,16 @@ slot_clock = { path = "../../eth2/utils/slot_clock" } protos = { path = "../../protos" } fork_choice = { path = "../../eth2/fork_choice" } grpcio = { version = "0.4", default-features = false, features = ["protobuf-codec"] } +persistent = "^0.4" protobuf = "2.0.2" prometheus = "^0.6" clap = "2.32.0" store = { path = "../store" } dirs = "1.0.3" futures = "0.1.23" +serde = "1.0" +serde_derive = "1.0" +serde_json = "1.0" slog = "^2.2.3" slog-term = "^2.4.0" slog-async = "^2.3.0" diff --git a/beacon_node/http_server/src/api.rs b/beacon_node/http_server/src/api.rs new file mode 100644 index 0000000000..c89cacd9a5 --- /dev/null +++ b/beacon_node/http_server/src/api.rs @@ -0,0 +1,69 @@ +use crate::key::BeaconChainKey; +use beacon_chain::{BeaconChain, BeaconChainTypes}; +use iron::prelude::*; +use iron::{ + headers::{CacheControl, CacheDirective, ContentType}, + status::Status, + AfterMiddleware, Handler, IronResult, Request, Response, +}; +use persistent::Read; +use router::Router; +use serde_json::json; +use std::sync::Arc; + +pub fn build_handler( + beacon_chain: Arc>, +) -> impl Handler { + let mut router = Router::new(); + + router.get("/node/fork", handle_fork::, "fork"); + + let mut chain = Chain::new(router); + + // Insert `BeaconChain` so it may be accessed in a request. + chain.link(Read::>::both(beacon_chain.clone())); + // Set the content-type headers. + chain.link_after(SetJsonContentType); + // Set the cache headers. + chain.link_after(SetCacheDirectives); + + chain +} + +/// Sets the `cache-control` headers on _all_ responses, unless they are already set. +struct SetCacheDirectives; +impl AfterMiddleware for SetCacheDirectives { + fn after(&self, _req: &mut Request, mut resp: Response) -> IronResult { + // This is run for every requests, AFTER all handlers have been executed + if resp.headers.get::() == None { + resp.headers.set(CacheControl(vec![ + CacheDirective::NoCache, + CacheDirective::NoStore, + ])); + } + Ok(resp) + } +} + +/// Sets the `content-type` headers on _all_ responses, unless they are already set. +struct SetJsonContentType; +impl AfterMiddleware for SetJsonContentType { + fn after(&self, _req: &mut Request, mut resp: Response) -> IronResult { + if resp.headers.get::() == None { + resp.headers.set(ContentType::json()); + } + Ok(resp) + } +} + +fn handle_fork(req: &mut Request) -> IronResult { + // TODO: investigate unwrap - I'm _guessing_ we'll never hit it but we should check to be sure. + let beacon_chain = req.get::>>().unwrap(); + + let response = json!({ + "fork": beacon_chain.head().beacon_state.fork, + "chain_id": beacon_chain.spec.chain_id + }); + + Ok(Response::with((Status::Ok, response.to_string()))) +} diff --git a/beacon_node/http_server/src/key.rs b/beacon_node/http_server/src/key.rs new file mode 100644 index 0000000000..2d27ce9f06 --- /dev/null +++ b/beacon_node/http_server/src/key.rs @@ -0,0 +1,12 @@ +use beacon_chain::{BeaconChain, BeaconChainTypes}; +use iron::typemap::Key; +use std::marker::PhantomData; +use std::sync::Arc; + +pub struct BeaconChainKey { + _phantom: PhantomData, +} + +impl Key for BeaconChainKey { + type Value = Arc>; +} diff --git a/beacon_node/http_server/src/lib.rs b/beacon_node/http_server/src/lib.rs index 6bc69e72d1..8f6378e9aa 100644 --- a/beacon_node/http_server/src/lib.rs +++ b/beacon_node/http_server/src/lib.rs @@ -1,11 +1,11 @@ -mod prometheus_handler; +mod api; +mod key; +mod metrics; use beacon_chain::{BeaconChain, BeaconChainTypes}; use futures::Future; use iron::prelude::*; -use iron::{status::Status, Handler, IronResult, Request, Response}; use network::NetworkMessage; -use prometheus_handler::PrometheusHandler; use router::Router; use slog::{info, o, warn}; use std::sync::Arc; @@ -26,32 +26,26 @@ impl Default for HttpServerConfig { } } -pub struct IndexHandler { - message: String, -} - -impl Handler for IndexHandler { - fn handle(&self, _: &mut Request) -> IronResult { - Ok(Response::with((Status::Ok, self.message.clone()))) - } -} - +/// Build the `iron` HTTP server, defining the core routes. pub fn create_iron_http_server( beacon_chain: Arc>, ) -> Iron { - let index_handler = IndexHandler { - message: "Hello world".to_string(), - }; - let prom_handler = PrometheusHandler { - beacon_chain: beacon_chain, - }; - let mut router = Router::new(); - router.get("/", index_handler, "index"); - router.get("/metrics", prom_handler, "metrics"); + + // A `GET` request to `/metrics` is handled by the `metrics` module. + router.get( + "/metrics", + metrics::build_handler(beacon_chain.clone()), + "metrics", + ); + + // Any request to all other endpoints is handled by the `api` module. + router.any("/*", api::build_handler(beacon_chain.clone()), "api"); + Iron::new(router) } +/// Start the HTTP service on the tokio `TaskExecutor`. pub fn start_service( config: &HttpServerConfig, executor: &TaskExecutor, diff --git a/beacon_node/http_server/src/metrics.rs b/beacon_node/http_server/src/metrics.rs new file mode 100644 index 0000000000..57fa70623e --- /dev/null +++ b/beacon_node/http_server/src/metrics.rs @@ -0,0 +1,57 @@ +use crate::key::BeaconChainKey; +use beacon_chain::{BeaconChain, BeaconChainTypes}; +use iron::prelude::*; +use iron::{status::Status, Handler, IronResult, Request, Response}; +use persistent::Read; +use prometheus::{Encoder, IntCounter, Opts, Registry, TextEncoder}; +use slot_clock::SlotClock; +use std::sync::Arc; +use types::Slot; + +pub fn build_handler( + beacon_chain: Arc>, +) -> impl Handler { + let mut chain = Chain::new(handle_metrics::); + + chain.link(Read::>::both(beacon_chain)); + + chain +} + +/// Handle a request for Prometheus metrics. +/// +/// Returns a text string containing all metrics. +fn handle_metrics(req: &mut Request) -> IronResult { + let beacon_chain = req.get::>>().unwrap(); + + let r = Registry::new(); + + let present_slot = if let Ok(Some(slot)) = beacon_chain.slot_clock.present_slot() { + slot + } else { + Slot::new(0) + }; + register_and_set_slot( + &r, + "present_slot", + "direct_slock_clock_reading", + present_slot, + ); + + // Gather the metrics. + let mut buffer = vec![]; + let encoder = TextEncoder::new(); + let metric_families = r.gather(); + encoder.encode(&metric_families, &mut buffer).unwrap(); + + let prom_string = String::from_utf8(buffer).unwrap(); + + Ok(Response::with((Status::Ok, prom_string))) +} + +fn register_and_set_slot(registry: &Registry, name: &str, help: &str, slot: Slot) { + let counter_opts = Opts::new(name, help); + let counter = IntCounter::with_opts(counter_opts).unwrap(); + registry.register(Box::new(counter.clone())).unwrap(); + counter.inc_by(slot.as_u64() as i64); +} diff --git a/beacon_node/http_server/src/prometheus_handler.rs b/beacon_node/http_server/src/prometheus_handler.rs deleted file mode 100644 index 60f56084cd..0000000000 --- a/beacon_node/http_server/src/prometheus_handler.rs +++ /dev/null @@ -1,47 +0,0 @@ -use beacon_chain::{BeaconChain, BeaconChainTypes}; -use iron::{status::Status, Handler, IronResult, Request, Response}; -use prometheus::{Encoder, IntCounter, Opts, Registry, TextEncoder}; -use slot_clock::SlotClock; -use std::sync::Arc; -use types::Slot; - -pub struct PrometheusHandler { - pub beacon_chain: Arc>, -} - -impl PrometheusHandler {} - -impl Handler for PrometheusHandler { - fn handle(&self, _: &mut Request) -> IronResult { - let r = Registry::new(); - - let present_slot = if let Ok(Some(slot)) = self.beacon_chain.slot_clock.present_slot() { - slot - } else { - Slot::new(0) - }; - register_and_set_slot( - &r, - "present_slot", - "direct_slock_clock_reading", - present_slot, - ); - - // Gather the metrics. - let mut buffer = vec![]; - let encoder = TextEncoder::new(); - let metric_families = r.gather(); - encoder.encode(&metric_families, &mut buffer).unwrap(); - - let prom_string = String::from_utf8(buffer).unwrap(); - - Ok(Response::with((Status::Ok, prom_string))) - } -} - -fn register_and_set_slot(registry: &Registry, name: &str, help: &str, slot: Slot) { - let counter_opts = Opts::new(name, help); - let counter = IntCounter::with_opts(counter_opts).unwrap(); - registry.register(Box::new(counter.clone())).unwrap(); - counter.inc_by(slot.as_u64() as i64); -} From 3a65f84b129100d405d7e441a29a1d4659517704 Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Mon, 27 May 2019 12:56:09 +1000 Subject: [PATCH 09/10] Improve comments for `http_server` --- beacon_node/http_server/src/api.rs | 1 + beacon_node/http_server/src/lib.rs | 2 +- beacon_node/http_server/src/metrics.rs | 1 + 3 files changed, 3 insertions(+), 1 deletion(-) diff --git a/beacon_node/http_server/src/api.rs b/beacon_node/http_server/src/api.rs index c89cacd9a5..11afdb9df3 100644 --- a/beacon_node/http_server/src/api.rs +++ b/beacon_node/http_server/src/api.rs @@ -11,6 +11,7 @@ use router::Router; use serde_json::json; use std::sync::Arc; +/// Yields a handler for the HTTP API. pub fn build_handler( beacon_chain: Arc>, ) -> impl Handler { diff --git a/beacon_node/http_server/src/lib.rs b/beacon_node/http_server/src/lib.rs index 8f6378e9aa..77665db8dd 100644 --- a/beacon_node/http_server/src/lib.rs +++ b/beacon_node/http_server/src/lib.rs @@ -87,7 +87,7 @@ pub fn start_service( info!(log, "HTTP server shutting down"); if let Ok(mut server) = server_start_result { - // According to the documentation, this function "doesn't work" and the server + // According to the documentation, `server.close()` "doesn't work" and the server // keeps listening. // // It is being called anyway, because it seems like the right thing to do. If you diff --git a/beacon_node/http_server/src/metrics.rs b/beacon_node/http_server/src/metrics.rs index 57fa70623e..30acb88531 100644 --- a/beacon_node/http_server/src/metrics.rs +++ b/beacon_node/http_server/src/metrics.rs @@ -8,6 +8,7 @@ use slot_clock::SlotClock; use std::sync::Arc; use types::Slot; +/// Yields a handler for the metrics endpoint. pub fn build_handler( beacon_chain: Arc>, ) -> impl Handler { From ed4d7aa44a03d32cceaabc26c1e4017a6da4987e Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Mon, 27 May 2019 17:09:16 +1000 Subject: [PATCH 10/10] Replace `http_server` unwrap with 500 error --- beacon_node/http_server/src/api.rs | 7 ++++--- beacon_node/http_server/src/lib.rs | 8 ++++++++ beacon_node/http_server/src/metrics.rs | 6 ++++-- 3 files changed, 16 insertions(+), 5 deletions(-) diff --git a/beacon_node/http_server/src/api.rs b/beacon_node/http_server/src/api.rs index 11afdb9df3..a910808998 100644 --- a/beacon_node/http_server/src/api.rs +++ b/beacon_node/http_server/src/api.rs @@ -1,4 +1,4 @@ -use crate::key::BeaconChainKey; +use crate::{key::BeaconChainKey, map_persistent_err_to_500}; use beacon_chain::{BeaconChain, BeaconChainTypes}; use iron::prelude::*; use iron::{ @@ -58,8 +58,9 @@ impl AfterMiddleware for SetJsonContentType { } fn handle_fork(req: &mut Request) -> IronResult { - // TODO: investigate unwrap - I'm _guessing_ we'll never hit it but we should check to be sure. - let beacon_chain = req.get::>>().unwrap(); + let beacon_chain = req + .get::>>() + .map_err(map_persistent_err_to_500)?; let response = json!({ "fork": beacon_chain.head().beacon_state.fork, diff --git a/beacon_node/http_server/src/lib.rs b/beacon_node/http_server/src/lib.rs index 77665db8dd..486badaff2 100644 --- a/beacon_node/http_server/src/lib.rs +++ b/beacon_node/http_server/src/lib.rs @@ -108,3 +108,11 @@ pub fn start_service( shutdown_trigger } + +/// Helper function for mapping a failure to read state to a 500 server error. +fn map_persistent_err_to_500(e: persistent::PersistentError) -> iron::error::IronError { + iron::error::IronError { + error: Box::new(e), + response: iron::Response::with(iron::status::Status::InternalServerError), + } +} diff --git a/beacon_node/http_server/src/metrics.rs b/beacon_node/http_server/src/metrics.rs index 30acb88531..eb7816d0eb 100644 --- a/beacon_node/http_server/src/metrics.rs +++ b/beacon_node/http_server/src/metrics.rs @@ -1,4 +1,4 @@ -use crate::key::BeaconChainKey; +use crate::{key::BeaconChainKey, map_persistent_err_to_500}; use beacon_chain::{BeaconChain, BeaconChainTypes}; use iron::prelude::*; use iron::{status::Status, Handler, IronResult, Request, Response}; @@ -23,7 +23,9 @@ pub fn build_handler( /// /// Returns a text string containing all metrics. fn handle_metrics(req: &mut Request) -> IronResult { - let beacon_chain = req.get::>>().unwrap(); + let beacon_chain = req + .get::>>() + .map_err(map_persistent_err_to_500)?; let r = Registry::new();