diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index ad0627078b..33205e97c3 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -1,6 +1,7 @@ use crate::checkpoint::CheckPoint; use crate::errors::{BeaconChainError as Error, BlockProductionError}; use crate::eth1_chain::{Eth1Chain, Eth1ChainBackend}; +use crate::events::{EventHandler, EventKind}; use crate::fork_choice::{Error as ForkChoiceError, ForkChoice}; use crate::iter::{ReverseBlockRootIterator, ReverseStateRootIterator}; use crate::metrics; @@ -95,6 +96,7 @@ pub trait BeaconChainTypes: Send + Sync + 'static { type LmdGhost: LmdGhost; type Eth1Chain: Eth1ChainBackend; type EthSpec: types::EthSpec; + type EventHandler: EventHandler; } /// Represents the "Beacon Chain" component of Ethereum 2.0. Allows import of blocks and block @@ -117,6 +119,8 @@ pub struct BeaconChain { /// A state-machine that is updated with information from the network and chooses a canonical /// head block. pub fork_choice: ForkChoice, + /// A handler for events generated by the beacon chain. + pub event_handler: T::EventHandler, /// Logging to CLI, etc. log: Logger, } @@ -126,6 +130,7 @@ impl BeaconChain { pub fn from_genesis( store: Arc, eth1_backend: T::Eth1Chain, + event_handler: T::EventHandler, mut genesis_state: BeaconState, mut genesis_block: BeaconBlock, spec: ChainSpec, @@ -174,6 +179,7 @@ impl BeaconChain { canonical_head, genesis_block_root, fork_choice: ForkChoice::new(store.clone(), &genesis_block, genesis_block_root), + event_handler, store, log, }) @@ -183,6 +189,7 @@ impl BeaconChain { pub fn from_store( store: Arc, eth1_backend: T::Eth1Chain, + event_handler: T::EventHandler, spec: ChainSpec, log: Logger, ) -> Result>, Error> { @@ -219,6 +226,7 @@ impl BeaconChain { slot_clock, fork_choice: ForkChoice::new(store.clone(), last_finalized_block, last_finalized_root), op_pool, + event_handler, eth1_chain: Eth1Chain::new(eth1_backend), canonical_head: RwLock::new(p.canonical_head), genesis_block_root: p.genesis_block_root, @@ -629,6 +637,59 @@ impl BeaconChain { pub fn process_attestation( &self, attestation: Attestation, + ) -> Result { + let outcome = self.process_attestation_internal(attestation.clone()); + + match &outcome { + Ok(outcome) => match outcome { + AttestationProcessingOutcome::Processed => { + trace!( + self.log, + "Beacon attestation imported"; + "shard" => attestation.data.crosslink.shard, + "target_epoch" => attestation.data.target.epoch, + ); + let _ = self + .event_handler + .register(EventKind::BeaconAttestationImported { + attestation: Box::new(attestation), + }); + } + other => { + warn!( + self.log, + "Beacon attestation rejected"; + "reason" => format!("{:?}", other), + ); + let _ = self + .event_handler + .register(EventKind::BeaconAttestationRejected { + reason: format!("Invalid attestation: {:?}", other), + attestation: Box::new(attestation), + }); + } + }, + Err(e) => { + error!( + self.log, + "Beacon attestation processing error"; + "error" => format!("{:?}", e), + ); + let _ = self + .event_handler + .register(EventKind::BeaconAttestationRejected { + reason: format!("Internal error: {:?}", e), + attestation: Box::new(attestation), + }); + } + } + + outcome + } + + pub fn process_attestation_internal( + &self, + attestation: Attestation, ) -> Result { metrics::inc_counter(&metrics::ATTESTATION_PROCESSING_REQUESTS); let timer = metrics::start_timer(&metrics::ATTESTATION_PROCESSING_TIMES); @@ -932,6 +993,57 @@ impl BeaconChain { pub fn process_block( &self, block: BeaconBlock, + ) -> Result { + let outcome = self.process_block_internal(block.clone()); + + match &outcome { + Ok(outcome) => match outcome { + BlockProcessingOutcome::Processed { block_root } => { + trace!( + self.log, + "Beacon block imported"; + "block_root" => format!("{:?}", block_root), + "block_slot" => format!("{:?}", block_root), + ); + let _ = self.event_handler.register(EventKind::BeaconBlockImported { + block_root: *block_root, + block: Box::new(block), + }); + } + other => { + warn!( + self.log, + "Beacon block rejected"; + "reason" => format!("{:?}", other), + ); + let _ = self.event_handler.register(EventKind::BeaconBlockRejected { + reason: format!("Invalid block: {:?}", other), + block: Box::new(block), + }); + } + }, + Err(e) => { + error!( + self.log, + "Beacon block processing error"; + "error" => format!("{:?}", e), + ); + let _ = self.event_handler.register(EventKind::BeaconBlockRejected { + reason: format!("Internal error: {:?}", e), + block: Box::new(block), + }); + } + } + + outcome + } + + /// Accept some block and attempt to add it to block DAG. + /// + /// Will accept blocks from prior slots, however it will reject any block from a future slot. + fn process_block_internal( + &self, + block: BeaconBlock, ) -> Result { metrics::inc_counter(&metrics::BLOCK_PROCESSING_REQUESTS); let full_timer = metrics::start_timer(&metrics::BLOCK_PROCESSING_TIMES); diff --git a/beacon_node/beacon_chain/src/beacon_chain_builder.rs b/beacon_node/beacon_chain/src/beacon_chain_builder.rs index 2a3537020a..357644a2d5 100644 --- a/beacon_node/beacon_chain/src/beacon_chain_builder.rs +++ b/beacon_node/beacon_chain/src/beacon_chain_builder.rs @@ -131,10 +131,11 @@ impl BeaconChainBuilder { self, store: Arc, eth1_backend: T::Eth1Chain, + event_handler: T::EventHandler, ) -> Result, String> { Ok(match self.build_strategy { BuildStrategy::LoadFromStore => { - BeaconChain::from_store(store, eth1_backend, self.spec, self.log) + BeaconChain::from_store(store, eth1_backend, event_handler, self.spec, self.log) .map_err(|e| format!("Error loading BeaconChain from database: {:?}", e))? .ok_or_else(|| format!("Unable to find exising BeaconChain in database."))? } @@ -144,6 +145,7 @@ impl BeaconChainBuilder { } => BeaconChain::from_genesis( store, eth1_backend, + event_handler, genesis_state.as_ref().clone(), genesis_block.as_ref().clone(), self.spec, diff --git a/beacon_node/beacon_chain/src/events.rs b/beacon_node/beacon_chain/src/events.rs new file mode 100644 index 0000000000..d690eabf18 --- /dev/null +++ b/beacon_node/beacon_chain/src/events.rs @@ -0,0 +1,51 @@ +use serde_derive::{Deserialize, Serialize}; +use std::marker::PhantomData; +use types::{Attestation, BeaconBlock, EthSpec, Hash256}; + +pub trait EventHandler: Sized + Send + Sync { + fn register(&self, kind: EventKind) -> Result<(), String>; +} + +pub struct NullEventHandler(PhantomData); + +impl EventHandler for NullEventHandler { + fn register(&self, _kind: EventKind) -> Result<(), String> { + Ok(()) + } +} + +impl Default for NullEventHandler { + fn default() -> Self { + NullEventHandler(PhantomData) + } +} + +#[derive(Debug, Serialize, Deserialize)] +#[serde( + bound = "T: EthSpec", + rename_all = "snake_case", + tag = "event", + content = "data" +)] +pub enum EventKind { + BeaconHeadChanged { + reorg: bool, + current_head_beacon_block_root: Hash256, + previous_head_beacon_block_root: Hash256, + }, + BeaconBlockImported { + block_root: Hash256, + block: Box>, + }, + BeaconBlockRejected { + reason: String, + block: Box>, + }, + BeaconAttestationImported { + attestation: Box>, + }, + BeaconAttestationRejected { + reason: String, + attestation: Box>, + }, +} diff --git a/beacon_node/beacon_chain/src/lib.rs b/beacon_node/beacon_chain/src/lib.rs index 0361723489..7f7e4ec2b9 100644 --- a/beacon_node/beacon_chain/src/lib.rs +++ b/beacon_node/beacon_chain/src/lib.rs @@ -7,6 +7,7 @@ mod beacon_chain_builder; mod checkpoint; mod errors; mod eth1_chain; +pub mod events; mod fork_choice; mod iter; mod metrics; diff --git a/beacon_node/beacon_chain/src/test_utils.rs b/beacon_node/beacon_chain/src/test_utils.rs index 7670ac74e4..97b802ddfe 100644 --- a/beacon_node/beacon_chain/src/test_utils.rs +++ b/beacon_node/beacon_chain/src/test_utils.rs @@ -1,6 +1,6 @@ use crate::{ - AttestationProcessingOutcome, BeaconChain, BeaconChainBuilder, BeaconChainTypes, - BlockProcessingOutcome, InteropEth1ChainBackend, + events::NullEventHandler, AttestationProcessingOutcome, BeaconChain, BeaconChainBuilder, + BeaconChainTypes, BlockProcessingOutcome, InteropEth1ChainBackend, }; use lmd_ghost::LmdGhost; use rayon::prelude::*; @@ -68,6 +68,7 @@ where type LmdGhost = L; type Eth1Chain = InteropEth1ChainBackend; type EthSpec = E; + type EventHandler = NullEventHandler; } /// A testing harness which can instantiate a `BeaconChain` and populate it with blocks and @@ -103,7 +104,11 @@ where let chain = BeaconChainBuilder::quick_start(HARNESS_GENESIS_TIME, &keypairs, spec.clone(), log) .unwrap_or_else(|e| panic!("Failed to create beacon chain builder: {}", e)) - .build(store.clone(), InteropEth1ChainBackend::default()) + .build( + store.clone(), + InteropEth1ChainBackend::default(), + NullEventHandler::default(), + ) .unwrap_or_else(|e| panic!("Failed to build beacon chain: {}", e)); Self { diff --git a/beacon_node/client/Cargo.toml b/beacon_node/client/Cargo.toml index 383318b0d3..e557217938 100644 --- a/beacon_node/client/Cargo.toml +++ b/beacon_node/client/Cargo.toml @@ -6,6 +6,7 @@ edition = "2018" [dependencies] beacon_chain = { path = "../beacon_chain" } +store = { path = "../store" } network = { path = "../network" } eth2-libp2p = { path = "../eth2-libp2p" } rpc = { path = "../rpc" } diff --git a/beacon_node/client/src/lib.rs b/beacon_node/client/src/lib.rs index 08674166d6..b4c7c9347a 100644 --- a/beacon_node/client/src/lib.rs +++ b/beacon_node/client/src/lib.rs @@ -20,18 +20,19 @@ use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; use tokio::runtime::TaskExecutor; use tokio::timer::Interval; use types::EthSpec; +use websocket_server::WebSocketSender; pub use beacon_chain::{BeaconChainTypes, Eth1ChainBackend, InteropEth1ChainBackend}; pub use config::{BeaconChainStartMethod, Config as ClientConfig, Eth1BackendMethod}; pub use eth2_config::Eth2Config; #[derive(Clone)] -pub struct ClientType { +pub struct RuntimeBeaconChainTypes { _phantom_s: PhantomData, _phantom_e: PhantomData, } -impl BeaconChainTypes for ClientType +impl BeaconChainTypes for RuntimeBeaconChainTypes where S: Store + 'static, E: EthSpec, @@ -41,17 +42,22 @@ where type LmdGhost = ThreadSafeReducedTree; type Eth1Chain = InteropEth1ChainBackend; type EthSpec = E; + type EventHandler = WebSocketSender; } /// Main beacon node client service. This provides the connection and initialisation of the clients /// sub-services in multiple threads. -pub struct Client { +pub struct Client +where + S: Store + Clone + 'static, + E: EthSpec, +{ /// Configuration for the lighthouse client. _client_config: ClientConfig, /// The beacon chain for the running client. - beacon_chain: Arc>, + beacon_chain: Arc>>, /// Reference to the network service. - pub network: Arc>, + pub network: Arc>>, /// Signal to terminate the RPC server. pub rpc_exit_signal: Option, /// Signal to terminate the slot timer. @@ -60,19 +66,22 @@ pub struct Client { pub api_exit_signal: Option, /// The clients logger. log: slog::Logger, + /* /// Marker to pin the beacon chain generics. - phantom: PhantomData, + phantom: PhantomData, + */ } -impl Client +impl Client where - T: BeaconChainTypes + Clone, + S: Store + Clone + 'static, + E: EthSpec, { /// Generate an instance of the client. Spawn and link all internal sub-processes. pub fn new( client_config: ClientConfig, eth2_config: Eth2Config, - store: T::Store, + store: S, log: slog::Logger, executor: &TaskExecutor, ) -> error::Result { @@ -169,11 +178,19 @@ where } }; - let eth1_backend = T::Eth1Chain::new(String::new()).map_err(|e| format!("{:?}", e))?; + let eth1_backend = + InteropEth1ChainBackend::new(String::new()).map_err(|e| format!("{:?}", e))?; - let beacon_chain: Arc> = Arc::new( + // Start the websocket server. + let websocket_sender: WebSocketSender = if client_config.websocket_server.enabled { + websocket_server::start_server(&client_config.websocket_server, &log)? + } else { + WebSocketSender::dummy() + }; + + let beacon_chain: Arc>> = Arc::new( beacon_chain_builder - .build(store, eth1_backend) + .build(store, eth1_backend, websocket_sender) .map_err(error::Error::from)?, ); @@ -229,11 +246,6 @@ where None }; - // Start the websocket server - let _websocket_sender = if client_config.websocket_server.enabled { - websocket_server::start_server::(&client_config.websocket_server, &log)?; - }; - let (slot_timer_exit_signal, exit) = exit_future::signal(); if let Some(duration_to_next_slot) = beacon_chain.slot_clock.duration_to_next_slot() { // set up the validator work interval - start at next slot and proceed every slot @@ -268,12 +280,11 @@ where api_exit_signal, log, network, - phantom: PhantomData, }) } } -impl Drop for Client { +impl Drop for Client { fn drop(&mut self) { // Save the beacon chain to it's store before dropping. let _result = self.beacon_chain.persist(); diff --git a/beacon_node/client/src/notifier.rs b/beacon_node/client/src/notifier.rs index 343918d4d5..20da963ec3 100644 --- a/beacon_node/client/src/notifier.rs +++ b/beacon_node/client/src/notifier.rs @@ -1,11 +1,12 @@ use crate::Client; -use beacon_chain::BeaconChainTypes; use exit_future::Exit; use futures::{Future, Stream}; use slog::{debug, o, warn}; use std::time::{Duration, Instant}; +use store::Store; use tokio::runtime::TaskExecutor; use tokio::timer::Interval; +use types::EthSpec; /// The interval between heartbeat events. pub const HEARTBEAT_INTERVAL_SECONDS: u64 = 15; @@ -17,7 +18,11 @@ pub const WARN_PEER_COUNT: usize = 1; /// durations. /// /// Presently unused, but remains for future use. -pub fn run(client: &Client, executor: TaskExecutor, exit: Exit) { +pub fn run(client: &Client, executor: TaskExecutor, exit: Exit) +where + S: Store + Clone + 'static, + E: EthSpec, +{ // notification heartbeat let interval = Interval::new( Instant::now(), diff --git a/beacon_node/src/run.rs b/beacon_node/src/run.rs index d036ef0c40..3d6607552c 100644 --- a/beacon_node/src/run.rs +++ b/beacon_node/src/run.rs @@ -1,19 +1,17 @@ -use client::{ - error, notifier, BeaconChainTypes, Client, ClientConfig, ClientType, Eth1BackendMethod, - Eth2Config, -}; +use client::{error, notifier, Client, ClientConfig, Eth1BackendMethod, Eth2Config}; use futures::sync::oneshot; use futures::Future; use slog::{error, info}; use std::cell::RefCell; use std::path::Path; use std::path::PathBuf; +use store::Store; use store::{DiskStore, MemoryStore}; use tokio::runtime::Builder; use tokio::runtime::Runtime; use tokio::runtime::TaskExecutor; use tokio_timer::clock::Clock; -use types::{InteropEthSpec, MainnetEthSpec, MinimalEthSpec}; +use types::{EthSpec, InteropEthSpec, MainnetEthSpec, MinimalEthSpec}; /// Reads the configuration and initializes a `BeaconChain` with the required types and parameters. /// @@ -52,14 +50,7 @@ pub fn run_beacon_node( macro_rules! run_client { ($store: ty, $eth_spec: ty) => { - run::>( - &db_path, - client_config, - eth2_config, - executor, - runtime, - log, - ) + run::<$store, $eth_spec>(&db_path, client_config, eth2_config, executor, runtime, log) }; } @@ -82,7 +73,7 @@ pub fn run_beacon_node( } /// Performs the type-generic parts of launching a `BeaconChain`. -fn run( +fn run( db_path: &Path, client_config: ClientConfig, eth2_config: Eth2Config, @@ -91,12 +82,13 @@ fn run( log: &slog::Logger, ) -> error::Result<()> where - T: BeaconChainTypes + Clone, - T::Store: OpenDatabase, + S: Store + Clone + 'static + OpenDatabase, + E: EthSpec, { - let store = T::Store::open_database(&db_path)?; + let store = S::open_database(&db_path)?; - let client: Client = Client::new(client_config, eth2_config, store, log.clone(), &executor)?; + let client: Client = + Client::new(client_config, eth2_config, store, log.clone(), &executor)?; // run service until ctrl-c let (ctrlc_send, ctrlc_oneshot) = oneshot::channel(); diff --git a/beacon_node/websocket_server/Cargo.toml b/beacon_node/websocket_server/Cargo.toml index f846f62b7c..a7bf85b12d 100644 --- a/beacon_node/websocket_server/Cargo.toml +++ b/beacon_node/websocket_server/Cargo.toml @@ -7,10 +7,12 @@ edition = "2018" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +beacon_chain = { path = "../beacon_chain" } exit-future = "0.1.3" futures = "0.1.25" serde = "1.0" serde_derive = "1.0" +serde_json = "^1.0" slog = "^2.2.3" tokio = "0.1.16" types = { path = "../../eth2/types" } diff --git a/beacon_node/websocket_server/src/lib.rs b/beacon_node/websocket_server/src/lib.rs index eb28b10bea..1c7d9ddb9b 100644 --- a/beacon_node/websocket_server/src/lib.rs +++ b/beacon_node/websocket_server/src/lib.rs @@ -1,5 +1,7 @@ -use serde_derive::{Deserialize, Serialize}; +use beacon_chain::events::{EventHandler, EventKind}; +use serde::{Deserialize, Serialize}; use slog::{error, info, Logger}; +use std::marker::PhantomData; use std::net::Ipv4Addr; use std::thread; use types::EthSpec; @@ -25,19 +27,44 @@ impl Default for Config { } } -pub struct WebSocketSender { - sender: Sender, +pub struct WebSocketSender { + sender: Option, + _phantom: PhantomData, } -impl WebSocketSender { +impl WebSocketSender { + /// Creates a dummy websocket server that never starts and where all future calls are no-ops. + pub fn dummy() -> Self { + Self { + sender: None, + _phantom: PhantomData, + } + } + pub fn send_string(&self, string: String) -> Result<(), String> { - self.sender - .send(string) - .map_err(|e| format!("Unable to broadcast to websocket clients: {:?}", e)) + if let Some(sender) = &self.sender { + sender + .send(string) + .map_err(|e| format!("Unable to broadcast to websocket clients: {:?}", e)) + } else { + Ok(()) + } } } -pub fn start_server(config: &Config, log: &Logger) -> Result { +impl EventHandler for WebSocketSender { + fn register(&self, kind: EventKind) -> Result<(), String> { + self.send_string( + serde_json::to_string(&kind) + .map_err(|e| format!("Unable to serialize event: {:?}", e))?, + ) + } +} + +pub fn start_server( + config: &Config, + log: &Logger, +) -> Result, String> { let server_string = format!("{}:{}", config.listen_address, config.port); info!( @@ -70,6 +97,7 @@ pub fn start_server(config: &Config, log: &Logger) -> Result