From e20a2deebdf2e99f5f2df56902b4b9586e78d7a7 Mon Sep 17 00:00:00 2001 From: Adam Szkoda Date: Sat, 6 Jun 2020 08:39:11 +0200 Subject: [PATCH] Add first Server Sent Events API endpoint (#1107) * Add Server Sent Events API endpoint * Support both event handlers as a transitory measure * Fix merge conflicts --- Cargo.lock | 28 +++++++++ beacon_node/beacon_chain/Cargo.toml | 1 + beacon_node/beacon_chain/src/events.rs | 82 +++++++++++++++++++++++++- beacon_node/client/Cargo.toml | 1 + beacon_node/client/src/builder.rs | 55 ++++++++++++++++- beacon_node/rest_api/Cargo.toml | 2 + beacon_node/rest_api/src/beacon.rs | 52 +++++++++++++++- beacon_node/rest_api/src/error.rs | 6 ++ beacon_node/rest_api/src/lib.rs | 18 ++---- beacon_node/rest_api/src/router.rs | 9 ++- beacon_node/src/lib.rs | 14 +++-- 11 files changed, 244 insertions(+), 24 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 621199f17b..655bdfa7f0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -202,6 +202,12 @@ dependencies = [ "webpki-roots 0.19.0", ] +[[package]] +name = "atomic-option" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0db678acb667b525ac40a324fc5f7d3390e29239b31c7327bb8157f5b4fff593" + [[package]] name = "atty" version = "0.2.14" @@ -272,6 +278,7 @@ version = "0.1.2" dependencies = [ "bitvec", "bls", + "bus", "environment", "eth1", "eth2_config", @@ -478,6 +485,18 @@ version = "3.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5356f1d23ee24a1f785a56d1d1a5f0fd5b0f6a0c0fb2412ce11da71649ab78f6" +[[package]] +name = "bus" +version = "2.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f1e66e1779f5b1440f1a58220ba3b3ded4427175f0a9fb8d7066521f8b4e8f2b" +dependencies = [ + "atomic-option", + "crossbeam-channel", + "num_cpus", + "parking_lot_core 0.7.2", +] + [[package]] name = "byte-slice-cast" version = "0.3.5" @@ -616,6 +635,7 @@ name = "client" version = "0.1.2" dependencies = [ "beacon_chain", + "bus", "dirs", "environment", "error-chain", @@ -3822,6 +3842,7 @@ dependencies = [ "assert_matches", "beacon_chain", "bls", + "bus", "environment", "eth2-libp2p", "eth2_config", @@ -3852,6 +3873,7 @@ dependencies = [ "tokio 0.2.21", "tree_hash", "types", + "uhttp_sse", "url 2.1.1", "version", ] @@ -5372,6 +5394,12 @@ dependencies = [ "tree_hash_derive", ] +[[package]] +name = "uhttp_sse" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c6ff93345ba2206230b1bb1aa3ece1a63dd9443b7531024575d16a0680a59444" + [[package]] name = "uint" version = "0.8.3" diff --git a/beacon_node/beacon_chain/Cargo.toml b/beacon_node/beacon_chain/Cargo.toml index 9b6d80194f..b9d3c960e7 100644 --- a/beacon_node/beacon_chain/Cargo.toml +++ b/beacon_node/beacon_chain/Cargo.toml @@ -47,6 +47,7 @@ bitvec = "0.17.4" bls = { path = "../../crypto/bls" } safe_arith = { path = "../../consensus/safe_arith" } environment = { path = "../../lighthouse/environment" } +bus = "2.2.3" [dev-dependencies] lazy_static = "1.4.0" diff --git a/beacon_node/beacon_chain/src/events.rs b/beacon_node/beacon_chain/src/events.rs index d293bcb508..441d63be10 100644 --- a/beacon_node/beacon_chain/src/events.rs +++ b/beacon_node/beacon_chain/src/events.rs @@ -1,6 +1,10 @@ +use bus::Bus; +use parking_lot::Mutex; use serde_derive::{Deserialize, Serialize}; +use slog::{error, Logger}; use std::marker::PhantomData; -use types::{Attestation, Epoch, EthSpec, Hash256, SignedBeaconBlock}; +use std::sync::Arc; +use types::{Attestation, Epoch, EthSpec, Hash256, SignedBeaconBlock, SignedBeaconBlockHash}; pub use websocket_server::WebSocketSender; pub trait EventHandler: Sized + Send + Sync { @@ -18,6 +22,80 @@ impl EventHandler for WebSocketSender { } } +pub struct ServerSentEvents { + // Bus<> is itself Sync + Send. We use Mutex<> here only because of the surrounding code does + // not enforce mutability statically (i.e. relies on interior mutability). + head_changed_queue: Arc>>, + log: Logger, + _phantom: PhantomData, +} + +impl ServerSentEvents { + pub fn new(log: Logger) -> (Self, Arc>>) { + let bus = Bus::new(T::slots_per_epoch() as usize); + let mutex = Mutex::new(bus); + let arc = Arc::new(mutex); + let this = Self { + head_changed_queue: arc.clone(), + log: log, + _phantom: PhantomData, + }; + (this, arc) + } +} + +impl EventHandler for ServerSentEvents { + fn register(&self, kind: EventKind) -> Result<(), String> { + match kind { + EventKind::BeaconHeadChanged { + current_head_beacon_block_root, + .. + } => { + let mut guard = self.head_changed_queue.lock(); + if let Err(_) = guard.try_broadcast(current_head_beacon_block_root.into()) { + error!( + self.log, + "Head change streaming queue full"; + "dropped_change" => format!("{}", current_head_beacon_block_root), + ); + } + Ok(()) + } + _ => Ok(()), + } + } +} + +// An event handler that pushes events to both the websockets handler and the SSE handler. +// Named after the unix `tee` command. Meant as a temporary solution before ditching WebSockets +// completely once SSE functions well enough. +pub struct TeeEventHandler { + websockets_handler: WebSocketSender, + sse_handler: ServerSentEvents, +} + +impl TeeEventHandler { + pub fn new( + log: Logger, + websockets_handler: WebSocketSender, + ) -> Result<(Self, Arc>>), String> { + let (sse_handler, bus) = ServerSentEvents::new(log); + let result = Self { + websockets_handler: websockets_handler, + sse_handler: sse_handler, + }; + Ok((result, bus)) + } +} + +impl EventHandler for TeeEventHandler { + fn register(&self, kind: EventKind) -> Result<(), String> { + self.websockets_handler.register(kind.clone())?; + self.sse_handler.register(kind)?; + Ok(()) + } +} + impl EventHandler for NullEventHandler { fn register(&self, _kind: EventKind) -> Result<(), String> { Ok(()) @@ -30,7 +108,7 @@ impl Default for NullEventHandler { } } -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Serialize, Deserialize, Clone)] #[serde( bound = "T: EthSpec", rename_all = "snake_case", diff --git a/beacon_node/client/Cargo.toml b/beacon_node/client/Cargo.toml index 28ad6d1b56..1b9ef0ab8e 100644 --- a/beacon_node/client/Cargo.toml +++ b/beacon_node/client/Cargo.toml @@ -40,3 +40,4 @@ eth2_ssz = "0.1.2" lazy_static = "1.4.0" lighthouse_metrics = { path = "../../common/lighthouse_metrics" } time = "0.2.16" +bus = "2.2.3" diff --git a/beacon_node/client/src/builder.rs b/beacon_node/client/src/builder.rs index 4b68f93a8e..7c29e0f763 100644 --- a/beacon_node/client/src/builder.rs +++ b/beacon_node/client/src/builder.rs @@ -1,6 +1,7 @@ use crate::config::{ClientGenesis, Config as ClientConfig}; use crate::notifier::spawn_notifier; use crate::Client; +use beacon_chain::events::TeeEventHandler; use beacon_chain::{ builder::{BeaconChainBuilder, Witness}, eth1_chain::{CachingEth1Backend, Eth1Chain}, @@ -9,12 +10,14 @@ use beacon_chain::{ store::{HotColdDB, MemoryStore, Store, StoreConfig}, BeaconChain, BeaconChainTypes, Eth1ChainBackend, EventHandler, }; +use bus::Bus; use environment::RuntimeContext; use eth1::{Config as Eth1Config, Service as Eth1Service}; use eth2_config::Eth2Config; use eth2_libp2p::NetworkGlobals; use genesis::{interop_genesis_state, Eth1GenesisService}; use network::{NetworkConfig, NetworkMessage, NetworkService}; +use parking_lot::Mutex; use slog::info; use ssz::Decode; use std::net::SocketAddr; @@ -23,7 +26,10 @@ use std::sync::Arc; use std::time::Duration; use timer::spawn_timer; use tokio::sync::mpsc::UnboundedSender; -use types::{test_utils::generate_deterministic_keypairs, BeaconState, ChainSpec, EthSpec}; +use types::{ + test_utils::generate_deterministic_keypairs, BeaconState, ChainSpec, EthSpec, + SignedBeaconBlockHash, +}; use websocket_server::{Config as WebSocketConfig, WebSocketSender}; /// Interval between polling the eth1 node for genesis information. @@ -260,6 +266,7 @@ where mut self, client_config: &ClientConfig, eth2_config: &Eth2Config, + events: Arc>>, ) -> Result { let beacon_chain = self .beacon_chain @@ -296,6 +303,7 @@ where .create_freezer_db_path() .map_err(|_| "unable to read freezer DB dir")?, eth2_config.clone(), + events, ) .map_err(|e| format!("Failed to start HTTP API: {:?}", e))?; @@ -434,6 +442,51 @@ where } } +impl + ClientBuilder< + Witness< + TStore, + TStoreMigrator, + TSlotClock, + TEth1Backend, + TEthSpec, + TeeEventHandler, + >, + > +where + TStore: Store + 'static, + TStoreMigrator: Migrate, + TSlotClock: SlotClock + 'static, + TEth1Backend: Eth1ChainBackend + 'static, + TEthSpec: EthSpec + 'static, +{ + /// Specifies that the `BeaconChain` should publish events using the WebSocket server. + pub fn tee_event_handler( + mut self, + config: WebSocketConfig, + ) -> Result<(Self, Arc>>), String> { + let context = self + .runtime_context + .as_ref() + .ok_or_else(|| "websocket_event_handler requires a runtime_context")? + .service_context("ws".into()); + + let log = context.log().clone(); + let (sender, listening_addr): (WebSocketSender, Option<_>) = if config.enabled { + let (sender, listening_addr) = + websocket_server::start_server(context.executor, &config)?; + (sender, Some(listening_addr)) + } else { + (WebSocketSender::dummy(), None) + }; + + self.websocket_listen_addr = listening_addr; + let (tee_event_handler, bus) = TeeEventHandler::new(log, sender)?; + self.event_handler = Some(tee_event_handler); + Ok((self, bus)) + } +} + impl ClientBuilder< Witness< diff --git a/beacon_node/rest_api/Cargo.toml b/beacon_node/rest_api/Cargo.toml index 18db2094f5..f8cdbfe4f4 100644 --- a/beacon_node/rest_api/Cargo.toml +++ b/beacon_node/rest_api/Cargo.toml @@ -37,6 +37,8 @@ futures = "0.3.5" operation_pool = { path = "../operation_pool" } rayon = "1.3.0" environment = { path = "../../lighthouse/environment" } +uhttp_sse = "0.5.1" +bus = "2.2.3" [dev-dependencies] assert_matches = "1.3.0" diff --git a/beacon_node/rest_api/src/beacon.rs b/beacon_node/rest_api/src/beacon.rs index 7d41535b30..f0040cfefd 100644 --- a/beacon_node/rest_api/src/beacon.rs +++ b/beacon_node/rest_api/src/beacon.rs @@ -3,16 +3,22 @@ use crate::response_builder::ResponseBuilder; use crate::validator::get_state_for_epoch; use crate::{ApiError, ApiResult, UrlQuery}; use beacon_chain::{BeaconChain, BeaconChainTypes, StateSkipConfig}; -use hyper::{Body, Request}; +use bus::BusReader; +use futures::executor::block_on; +use hyper::body::Bytes; +use hyper::{Body, Request, Response}; use rest_types::{ BlockResponse, CanonicalHeadResponse, Committee, HeadBeaconBlock, StateResponse, ValidatorRequest, ValidatorResponse, }; +use std::io::Write; use std::sync::Arc; use store::Store; + +use slog::{error, Logger}; use types::{ AttesterSlashing, BeaconState, EthSpec, Hash256, ProposerSlashing, PublicKeyBytes, - RelativeEpoch, Slot, + RelativeEpoch, SignedBeaconBlockHash, Slot, }; /// HTTP handler to return a `BeaconBlock` at a given `root` or `slot`. @@ -122,6 +128,48 @@ pub fn get_block_root( ResponseBuilder::new(&req)?.body(&root) } +fn make_sse_response_chunk(new_head_hash: SignedBeaconBlockHash) -> std::io::Result { + let mut buffer = Vec::new(); + { + let mut sse_message = uhttp_sse::SseMessage::new(&mut buffer); + let untyped_hash: Hash256 = new_head_hash.into(); + write!(sse_message.data()?, "{:?}", untyped_hash)?; + } + let bytes: Bytes = buffer.into(); + Ok(bytes) +} + +pub fn stream_forks( + log: Logger, + mut events: BusReader, +) -> ApiResult { + let (mut sender, body) = Body::channel(); + std::thread::spawn(move || { + while let Ok(new_head_hash) = events.recv() { + let chunk = match make_sse_response_chunk(new_head_hash) { + Ok(chunk) => chunk, + Err(e) => { + error!(log, "Failed to make SSE chunk"; "error" => e.to_string()); + sender.abort(); + break; + } + }; + if let Err(bytes) = block_on(sender.send_data(chunk)) { + error!(log, "Couldn't stream piece {:?}", bytes); + } + } + }); + let response = Response::builder() + .status(200) + .header("Content-Type", "text/event-stream") + .header("Connection", "Keep-Alive") + .header("Cache-Control", "no-cache") + .header("Access-Control-Allow-Origin", "*") + .body(body) + .map_err(|e| ApiError::ServerError(format!("Failed to build response: {:?}", e)))?; + Ok(response) +} + /// HTTP handler to return the `Fork` of the current head. pub fn get_fork( req: Request, diff --git a/beacon_node/rest_api/src/error.rs b/beacon_node/rest_api/src/error.rs index 0897e62fef..b6ce5182d3 100644 --- a/beacon_node/rest_api/src/error.rs +++ b/beacon_node/rest_api/src/error.rs @@ -71,6 +71,12 @@ impl From for ApiError { } } +impl From for ApiError { + fn from(e: std::io::Error) -> ApiError { + ApiError::ServerError(format!("IO error: {:?}", e)) + } +} + impl StdError for ApiError { fn cause(&self) -> Option<&dyn StdError> { None diff --git a/beacon_node/rest_api/src/lib.rs b/beacon_node/rest_api/src/lib.rs index 0862b3e04d..00043d7b9b 100644 --- a/beacon_node/rest_api/src/lib.rs +++ b/beacon_node/rest_api/src/lib.rs @@ -21,6 +21,7 @@ mod url_query; mod validator; use beacon_chain::{BeaconChain, BeaconChainTypes}; +use bus::Bus; use client_network::NetworkMessage; pub use config::ApiEncodingFormat; use error::{ApiError, ApiResult}; @@ -30,12 +31,13 @@ use futures::future::TryFutureExt; use hyper::server::conn::AddrStream; use hyper::service::{make_service_fn, service_fn}; use hyper::{Body, Request, Server}; +use parking_lot::Mutex; use slog::{info, warn}; use std::net::SocketAddr; -use std::ops::Deref; use std::path::PathBuf; use std::sync::Arc; use tokio::sync::mpsc; +use types::SignedBeaconBlockHash; use url_query::UrlQuery; pub use crate::helpers::parse_pubkey_bytes; @@ -58,6 +60,7 @@ pub fn start_server( db_path: PathBuf, freezer_db_path: PathBuf, eth2_config: Eth2Config, + events: Arc>>, ) -> Result { let log = executor.log(); let inner_log = log.clone(); @@ -72,6 +75,7 @@ pub fn start_server( let network_channel = network_info.network_chan.clone(); let db_path = db_path.clone(); let freezer_db_path = freezer_db_path.clone(); + let events = events.clone(); async move { Ok::<_, hyper::Error>(service_fn(move |req: Request| { @@ -84,6 +88,7 @@ pub fn start_server( log.clone(), db_path.clone(), freezer_db_path.clone(), + events.clone(), ) })) } @@ -131,14 +136,3 @@ pub fn start_server( Ok(actual_listen_addr) } - -#[derive(Clone)] -pub struct DBPath(PathBuf); - -impl Deref for DBPath { - type Target = PathBuf; - - fn deref(&self) -> &Self::Target { - &self.0 - } -} diff --git a/beacon_node/rest_api/src/router.rs b/beacon_node/rest_api/src/router.rs index 7220927152..d7e41feab2 100644 --- a/beacon_node/rest_api/src/router.rs +++ b/beacon_node/rest_api/src/router.rs @@ -3,14 +3,16 @@ use crate::{ spec, validator, NetworkChannel, }; use beacon_chain::{BeaconChain, BeaconChainTypes}; +use bus::Bus; use eth2_config::Eth2Config; use eth2_libp2p::NetworkGlobals; use hyper::{Body, Error, Method, Request, Response}; +use parking_lot::Mutex; use slog::debug; use std::path::PathBuf; use std::sync::Arc; use std::time::Instant; -use types::Slot; +use types::{SignedBeaconBlockHash, Slot}; // Allowing more than 7 arguments. #[allow(clippy::too_many_arguments)] @@ -23,6 +25,7 @@ pub async fn route( local_log: slog::Logger, db_path: PathBuf, freezer_db_path: PathBuf, + events: Arc>>, ) -> Result, Error> { metrics::inc_counter(&metrics::REQUEST_COUNT); let timer = metrics::start_timer(&metrics::REQUEST_RESPONSE_TIME); @@ -63,6 +66,10 @@ pub async fn route( (&Method::GET, "/beacon/block") => beacon::get_block::(req, beacon_chain), (&Method::GET, "/beacon/block_root") => beacon::get_block_root::(req, beacon_chain), (&Method::GET, "/beacon/fork") => beacon::get_fork::(req, beacon_chain), + (&Method::GET, "/beacon/fork/stream") => { + let reader = events.lock().add_rx(); + beacon::stream_forks::(log, reader) + } (&Method::GET, "/beacon/genesis_time") => beacon::get_genesis_time::(req, beacon_chain), (&Method::GET, "/beacon/genesis_validators_root") => { beacon::get_genesis_validators_root::(req, beacon_chain) diff --git a/beacon_node/src/lib.rs b/beacon_node/src/lib.rs index cba45edff0..67bd283271 100644 --- a/beacon_node/src/lib.rs +++ b/beacon_node/src/lib.rs @@ -10,10 +10,10 @@ pub use client::{Client, ClientBuilder, ClientConfig, ClientGenesis}; pub use config::{get_data_dir, get_eth2_testnet_config, get_testnet_dir}; pub use eth2_config::Eth2Config; +use beacon_chain::events::TeeEventHandler; use beacon_chain::migrate::{BackgroundMigrator, HotColdDB}; use beacon_chain::{ - builder::Witness, eth1_chain::CachingEth1Backend, events::WebSocketSender, - slot_clock::SystemTimeSlotClock, + builder::Witness, eth1_chain::CachingEth1Backend, slot_clock::SystemTimeSlotClock, }; use clap::ArgMatches; use config::get_config; @@ -30,7 +30,7 @@ pub type ProductionClient = Client< SystemTimeSlotClock, CachingEth1Backend>, E, - WebSocketSender, + TeeEventHandler, >, >; @@ -113,15 +113,17 @@ impl ProductionBeaconNode { builder.no_eth1_backend()? }; - let builder = builder + let (builder, events) = builder .system_time_slot_clock()? - .websocket_event_handler(client_config.websocket_server.clone())? + .tee_event_handler(client_config.websocket_server.clone())?; + + let builder = builder .build_beacon_chain()? .network(&mut client_config.network)? .notifier()?; let builder = if client_config.rest_api.enabled { - builder.http_server(&client_config, &http_eth2_config)? + builder.http_server(&client_config, &http_eth2_config, events)? } else { builder };