diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index a04f6b1c86..64cd8ddfc8 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -88,15 +88,28 @@ pub trait BeaconChainTypes { type EthSpec: types::EthSpec; } +/// Represents the "Beacon Chain" component of Ethereum 2.0. Allows import of blocks and block +/// operations and chooses a canonical head. pub struct BeaconChain { + /// Persistent storage for blocks, states, etc. Typically an on-disk store, such as LevelDB. pub store: Arc, + /// Reports the current slot, typically based upon the system clock. pub slot_clock: T::SlotClock, + /// Stores all operations (e.g., `Attestation`, `Deposit`, etc) that are candidates for + /// inclusion in a block. pub op_pool: OperationPool, + /// Stores a "snapshot" of the chain at the time the head-of-the-chain block was recieved. canonical_head: RwLock>, - finalized_head: RwLock>, + /// The same state from `self.canonical_head`, but updated at the start of each slot with a + /// skip slot if no block is recieved. This is effectively a cache that avoids repeating calls + /// to `per_slot_processing`. state: RwLock>, - pub spec: ChainSpec, + /// The root of the genesis block. + genesis_block_root: Hash256, + /// A state-machine that is updated with information from the network and chooses a canonical + /// head block. pub fork_choice: RwLock, + /// Stores metrics about this `BeaconChain`. pub metrics: Metrics, } @@ -113,18 +126,12 @@ impl BeaconChain { let state_root = genesis_state.canonical_root(); store.put(&state_root, &genesis_state)?; - let block_root = genesis_block.block_header().canonical_root(); - store.put(&block_root, &genesis_block)?; + let genesis_block_root = genesis_block.block_header().canonical_root(); + store.put(&genesis_block_root, &genesis_block)?; - let finalized_head = RwLock::new(CheckPoint::new( - genesis_block.clone(), - block_root, - genesis_state.clone(), - state_root, - )); let canonical_head = RwLock::new(CheckPoint::new( genesis_block.clone(), - block_root, + genesis_block_root, genesis_state.clone(), state_root, )); @@ -136,9 +143,8 @@ impl BeaconChain { slot_clock, op_pool: OperationPool::new(), state: RwLock::new(genesis_state), - finalized_head, canonical_head, - spec, + genesis_block_root, fork_choice: RwLock::new(fork_choice), metrics: Metrics::new()?, }) @@ -168,10 +174,9 @@ impl BeaconChain { slot_clock, op_pool: OperationPool::default(), canonical_head: RwLock::new(p.canonical_head), - finalized_head: RwLock::new(p.finalized_head), state: RwLock::new(p.state), - spec, fork_choice: RwLock::new(fork_choice), + genesis_block_root: p.genesis_block_root, metrics: Metrics::new()?, })) } @@ -180,7 +185,7 @@ impl BeaconChain { pub fn persist(&self) -> Result<(), Error> { let p: PersistedBeaconChain = PersistedBeaconChain { canonical_head: self.canonical_head.read().clone(), - finalized_head: self.finalized_head.read().clone(), + genesis_block_root: self.genesis_block_root, state: self.state.read().clone(), }; @@ -309,42 +314,38 @@ impl BeaconChain { Ok(self.store.get(block_root)?) } - /// Update the canonical head to some new values. - fn update_canonical_head( - &self, - new_beacon_block: BeaconBlock, - new_beacon_block_root: Hash256, - new_beacon_state: BeaconState, - new_beacon_state_root: Hash256, - ) { - debug!( - "Updating canonical head with block at slot: {}", - new_beacon_block.slot - ); - let mut head = self.canonical_head.write(); - head.update( - new_beacon_block, - new_beacon_block_root, - new_beacon_state, - new_beacon_state_root, - ); - } - - /// Returns a read-lock guarded `CheckPoint` struct for reading the head (as chosen by the - /// fork-choice rule). - /// - /// It is important to note that the `beacon_state` returned may not match the present slot. It - /// is the state as it was when the head block was received, which could be some slots prior to - /// now. - pub fn head(&self) -> RwLockReadGuard> { - self.canonical_head.read() - } - - /// Returns the slot of the highest block in the canonical chain. - pub fn best_slot(&self) -> Slot { - self.canonical_head.read().beacon_block.slot + /// Update the canonical head to `new_head`. + fn update_canonical_head(&self, new_head: CheckPoint) -> Result<(), Error> { + // Update the checkpoint that stores the head of the chain at the time it received the + // block. + *self.canonical_head.write() = new_head; + + // Update the always-at-the-present-slot state we keep around for performance gains. + *self.state.write() = { + let mut state = self.canonical_head.read().beacon_state.clone(); + + let present_slot = match self.slot_clock.present_slot() { + Ok(Some(slot)) => slot, + _ => return Err(Error::UnableToReadSlot), + }; + + // If required, transition the new state to the present slot. + for _ in state.slot.as_u64()..present_slot.as_u64() { + per_slot_processing(&mut state, &T::EthSpec::spec())?; + } + + state.build_all_caches(&T::EthSpec::spec())?; + + state + }; + + // Save `self` to `self.store`. + self.persist()?; + + Ok(()) } + /* /// Updates the canonical `BeaconState` with the supplied state. /// /// Advances the chain forward to the present slot. This method is better than just setting @@ -362,10 +363,10 @@ impl BeaconChain { // If required, transition the new state to the present slot. for _ in state.slot.as_u64()..present_slot.as_u64() { - per_slot_processing(&mut state, &self.spec)?; + per_slot_processing(&mut state, &T::EthSpec::spec())?; } - state.build_all_caches(&self.spec)?; + state.build_all_caches(&T::EthSpec::spec())?; *self.state.write() = state; @@ -373,9 +374,33 @@ impl BeaconChain { Ok(()) } + */ + + /// Returns a read-lock guarded `BeaconState` which is the `canonical_head` that has been + /// updated to match the current slot clock. + pub fn current_state(&self) -> RwLockReadGuard> { + self.state.read() + } + + /// Returns a read-lock guarded `CheckPoint` struct for reading the head (as chosen by the + /// fork-choice rule). + /// + /// It is important to note that the `beacon_state` returned may not match the present slot. It + /// is the state as it was when the head block was received, which could be some slots prior to + /// now. + pub fn head(&self) -> RwLockReadGuard> { + self.canonical_head.read() + } + + /// Returns the slot of the highest block in the canonical chain. + pub fn best_slot(&self) -> Slot { + self.canonical_head.read().beacon_block.slot + } /// Ensures the current canonical `BeaconState` has been transitioned to match the `slot_clock`. pub fn catchup_state(&self) -> Result<(), Error> { + let spec = &T::EthSpec::spec(); + let present_slot = match self.slot_clock.present_slot() { Ok(Some(slot)) => slot, _ => return Err(Error::UnableToReadSlot), @@ -386,13 +411,13 @@ impl BeaconChain { // If required, transition the new state to the present slot. for _ in state.slot.as_u64()..present_slot.as_u64() { // Ensure the next epoch state caches are built in case of an epoch transition. - state.build_epoch_cache(RelativeEpoch::NextWithoutRegistryChange, &self.spec)?; - state.build_epoch_cache(RelativeEpoch::NextWithRegistryChange, &self.spec)?; + state.build_epoch_cache(RelativeEpoch::NextWithoutRegistryChange, spec)?; + state.build_epoch_cache(RelativeEpoch::NextWithRegistryChange, spec)?; - per_slot_processing(&mut *state, &self.spec)?; + per_slot_processing(&mut *state, spec)?; } - state.build_all_caches(&self.spec)?; + state.build_all_caches(spec)?; Ok(()) } @@ -401,34 +426,11 @@ impl BeaconChain { /// /// Ideally this shouldn't be required, however we leave it here for testing. pub fn ensure_state_caches_are_built(&self) -> Result<(), Error> { - self.state.write().build_all_caches(&self.spec)?; + self.state.write().build_all_caches(&T::EthSpec::spec())?; Ok(()) } - /// Update the justified head to some new values. - fn update_finalized_head( - &self, - new_beacon_block: BeaconBlock, - new_beacon_block_root: Hash256, - new_beacon_state: BeaconState, - new_beacon_state_root: Hash256, - ) { - let mut finalized_head = self.finalized_head.write(); - finalized_head.update( - new_beacon_block, - new_beacon_block_root, - new_beacon_state, - new_beacon_state_root, - ); - } - - /// Returns a read-lock guarded `CheckPoint` struct for reading the justified head (as chosen, - /// indirectly, by the fork-choice rule). - pub fn finalized_head(&self) -> RwLockReadGuard> { - self.finalized_head.read() - } - /// Returns the validator index (if any) for the given public key. /// /// Information is retrieved from the present `beacon_state.validator_registry`. @@ -467,13 +469,12 @@ impl BeaconChain { /// genesis. pub fn slots_since_genesis(&self) -> Option { let now = self.read_slot_clock()?; + let genesis_slot = T::EthSpec::spec().genesis_slot; - if now < self.spec.genesis_slot { + if now < genesis_slot { None } else { - Some(SlotHeight::from( - now.as_u64() - self.spec.genesis_slot.as_u64(), - )) + Some(SlotHeight::from(now.as_u64() - genesis_slot.as_u64())) } } @@ -493,12 +494,12 @@ impl BeaconChain { pub fn block_proposer(&self, slot: Slot) -> Result { self.state .write() - .build_epoch_cache(RelativeEpoch::Current, &self.spec)?; + .build_epoch_cache(RelativeEpoch::Current, &T::EthSpec::spec())?; let index = self.state.read().get_beacon_proposer_index( slot, RelativeEpoch::Current, - &self.spec, + &T::EthSpec::spec(), )?; Ok(index) @@ -519,7 +520,7 @@ impl BeaconChain { if let Some(attestation_duty) = self .state .read() - .get_attestation_duties(validator_index, &self.spec)? + .get_attestation_duties(validator_index, &T::EthSpec::spec())? { Ok(Some((attestation_duty.slot, attestation_duty.shard))) } else { @@ -529,7 +530,7 @@ impl BeaconChain { /// Produce an `AttestationData` that is valid for the present `slot` and given `shard`. pub fn produce_attestation_data(&self, shard: u64) -> Result { - trace!("BeaconChain::produce_attestation: shard: {}", shard); + let slots_per_epoch = T::EthSpec::spec().slots_per_epoch; self.metrics.attestation_production_requests.inc(); let timer = self.metrics.attestation_production_times.start_timer(); @@ -540,8 +541,8 @@ impl BeaconChain { .state .read() .slot - .epoch(self.spec.slots_per_epoch) - .start_slot(self.spec.slots_per_epoch); + .epoch(slots_per_epoch) + .start_slot(slots_per_epoch); let target_root = if state.slot == current_epoch_start_slot { // If we're on the first slot of the state's epoch. @@ -554,7 +555,7 @@ impl BeaconChain { *self .state .read() - .get_block_root(current_epoch_start_slot - self.spec.slots_per_epoch)? + .get_block_root(current_epoch_start_slot - slots_per_epoch)? } } else { // If we're not on the first slot of the epoch. @@ -587,9 +588,9 @@ impl BeaconChain { self.metrics.attestation_processing_requests.inc(); let timer = self.metrics.attestation_processing_times.start_timer(); - let result = self - .op_pool - .insert_attestation(attestation, &*self.state.read(), &self.spec); + let result = + self.op_pool + .insert_attestation(attestation, &*self.state.read(), &T::EthSpec::spec()); if result.is_ok() { self.metrics.attestation_production_successes.inc(); @@ -606,19 +607,19 @@ impl BeaconChain { deposit: Deposit, ) -> Result { self.op_pool - .insert_deposit(deposit, &*self.state.read(), &self.spec) + .insert_deposit(deposit, &*self.state.read(), &T::EthSpec::spec()) } /// Accept some exit and queue it for inclusion in an appropriate block. pub fn process_voluntary_exit(&self, exit: VoluntaryExit) -> Result<(), ExitValidationError> { self.op_pool - .insert_voluntary_exit(exit, &*self.state.read(), &self.spec) + .insert_voluntary_exit(exit, &*self.state.read(), &T::EthSpec::spec()) } /// Accept some transfer and queue it for inclusion in an appropriate block. pub fn process_transfer(&self, transfer: Transfer) -> Result<(), TransferValidationError> { self.op_pool - .insert_transfer(transfer, &*self.state.read(), &self.spec) + .insert_transfer(transfer, &*self.state.read(), &T::EthSpec::spec()) } /// Accept some proposer slashing and queue it for inclusion in an appropriate block. @@ -626,8 +627,11 @@ impl BeaconChain { &self, proposer_slashing: ProposerSlashing, ) -> Result<(), ProposerSlashingValidationError> { - self.op_pool - .insert_proposer_slashing(proposer_slashing, &*self.state.read(), &self.spec) + self.op_pool.insert_proposer_slashing( + proposer_slashing, + &*self.state.read(), + &T::EthSpec::spec(), + ) } /// Accept some attester slashing and queue it for inclusion in an appropriate block. @@ -635,8 +639,11 @@ impl BeaconChain { &self, attester_slashing: AttesterSlashing, ) -> Result<(), AttesterSlashingValidationError> { - self.op_pool - .insert_attester_slashing(attester_slashing, &*self.state.read(), &self.spec) + self.op_pool.insert_attester_slashing( + attester_slashing, + &*self.state.read(), + &T::EthSpec::spec(), + ) } /// Accept some block and attempt to add it to block DAG. @@ -686,7 +693,7 @@ impl BeaconChain { // Transition the parent state to the block slot. let mut state: BeaconState = parent_state; for _ in state.slot.as_u64()..block.slot.as_u64() { - if let Err(e) = per_slot_processing(&mut state, &self.spec) { + if let Err(e) = per_slot_processing(&mut state, &T::EthSpec::spec()) { return Ok(BlockProcessingOutcome::InvalidBlock( InvalidBlock::SlotProcessingError(e), )); @@ -695,7 +702,7 @@ impl BeaconChain { // Apply the received block to its parent state (which has been transitioned into this // slot). - if let Err(e) = per_block_processing(&mut state, &block, &self.spec) { + if let Err(e) = per_block_processing(&mut state, &block, &T::EthSpec::spec()) { return Ok(BlockProcessingOutcome::InvalidBlock( InvalidBlock::PerBlockProcessingError(e), )); @@ -716,7 +723,7 @@ impl BeaconChain { // Register the new block with the fork choice service. self.fork_choice .write() - .add_block(&block, &block_root, &self.spec)?; + .add_block(&block, &block_root, &T::EthSpec::spec())?; // Execute the fork choice algorithm, enthroning a new head if discovered. // @@ -744,7 +751,7 @@ impl BeaconChain { let mut state = self.state.read().clone(); - state.build_epoch_cache(RelativeEpoch::Current, &self.spec)?; + state.build_epoch_cache(RelativeEpoch::Current, &T::EthSpec::spec())?; trace!("Finding attestations for new block..."); @@ -752,14 +759,15 @@ impl BeaconChain { .get_block_root(state.slot - 1) .map_err(|_| BlockProductionError::UnableToGetBlockRootFromState)?; - let (proposer_slashings, attester_slashings) = - self.op_pool.get_slashings(&*self.state.read(), &self.spec); + let (proposer_slashings, attester_slashings) = self + .op_pool + .get_slashings(&*self.state.read(), &T::EthSpec::spec()); let mut block = BeaconBlock { slot: state.slot, previous_block_root, state_root: Hash256::zero(), // Updated after the state is calculated. - signature: self.spec.empty_signature.clone(), // To be completed by a validator. + signature: T::EthSpec::spec().empty_signature.clone(), // To be completed by a validator. body: BeaconBlockBody { randao_reveal, eth1_data: Eth1Data { @@ -771,12 +779,16 @@ impl BeaconChain { attester_slashings, attestations: self .op_pool - .get_attestations(&*self.state.read(), &self.spec), - deposits: self.op_pool.get_deposits(&*self.state.read(), &self.spec), + .get_attestations(&*self.state.read(), &T::EthSpec::spec()), + deposits: self + .op_pool + .get_deposits(&*self.state.read(), &T::EthSpec::spec()), voluntary_exits: self .op_pool - .get_voluntary_exits(&*self.state.read(), &self.spec), - transfers: self.op_pool.get_transfers(&*self.state.read(), &self.spec), + .get_voluntary_exits(&*self.state.read(), &T::EthSpec::spec()), + transfers: self + .op_pool + .get_transfers(&*self.state.read(), &T::EthSpec::spec()), }, }; @@ -785,7 +797,11 @@ impl BeaconChain { block.body.attestations.len() ); - per_block_processing_without_verifying_block_signature(&mut state, &block, &self.spec)?; + per_block_processing_without_verifying_block_signature( + &mut state, + &block, + &T::EthSpec::spec(), + )?; let state_root = state.canonical_root(); @@ -801,39 +817,53 @@ impl BeaconChain { pub fn fork_choice(&self) -> Result<(), Error> { self.metrics.fork_choice_requests.inc(); - let present_head_root = self.finalized_head().beacon_block_root; - + // Start fork choice metrics timer. let timer = self.metrics.fork_choice_times.start_timer(); - let new_head_root = self + let justified_root = { + let root = self.head().beacon_state.current_justified_root; + if root == T::EthSpec::spec().zero_hash { + self.genesis_block_root + } else { + root + } + }; + + // Determine the root of the block that is the head of the chain. + let beacon_block_root = self .fork_choice .write() - .find_head(&present_head_root, &self.spec)?; + .find_head(&justified_root, &T::EthSpec::spec())?; + // End fork choice metrics timer. timer.observe_duration(); - if new_head_root != present_head_root { + // If a new head was chosen. + if beacon_block_root != self.head().beacon_block_root { self.metrics.fork_choice_changed_head.inc(); - let block: BeaconBlock = self + let beacon_block: BeaconBlock = self .store - .get(&new_head_root)? - .ok_or_else(|| Error::MissingBeaconBlock(new_head_root))?; - let state: BeaconState = self - .store - .get(&block.state_root)? - .ok_or_else(|| Error::MissingBeaconState(block.state_root))?; + .get(&beacon_block_root)? + .ok_or_else(|| Error::MissingBeaconBlock(beacon_block_root))?; - // Log if we switched to a new chain. - if present_head_root != block.previous_block_root { + let beacon_state_root = beacon_block.state_root; + let beacon_state: BeaconState = self + .store + .get(&beacon_state_root)? + .ok_or_else(|| Error::MissingBeaconState(beacon_state_root))?; + + // If we switched to a new chain (instead of building atop the present chain). + if self.head().beacon_block_root != beacon_block.previous_block_root { self.metrics.fork_choice_reorg_count.inc(); }; - let state_root = block.state_root; - self.update_canonical_head(block, new_head_root, state.clone(), state_root); - - // Update the canonical `BeaconState`. - self.update_state(state)?; + self.update_canonical_head(CheckPoint { + beacon_block, + beacon_block_root, + beacon_state, + beacon_state_root, + })?; } Ok(()) @@ -863,7 +893,7 @@ impl BeaconChain { loop { let beacon_block_root = last_slot.beacon_block.previous_block_root; - if beacon_block_root == self.spec.zero_hash { + if beacon_block_root == T::EthSpec::spec().zero_hash { break; // Genesis has been reached. } diff --git a/beacon_node/beacon_chain/src/persisted_beacon_chain.rs b/beacon_node/beacon_chain/src/persisted_beacon_chain.rs index cb34e995cd..f5bdfdee15 100644 --- a/beacon_node/beacon_chain/src/persisted_beacon_chain.rs +++ b/beacon_node/beacon_chain/src/persisted_beacon_chain.rs @@ -2,7 +2,7 @@ use crate::{BeaconChainTypes, CheckPoint}; use ssz::{Decode, Encode}; use ssz_derive::{Decode, Encode}; use store::{DBColumn, Error as StoreError, StoreItem}; -use types::BeaconState; +use types::{BeaconState, Hash256}; /// 32-byte key for accessing the `PersistedBeaconChain`. pub const BEACON_CHAIN_DB_KEY: &str = "PERSISTEDBEACONCHAINPERSISTEDBEA"; @@ -10,8 +10,8 @@ pub const BEACON_CHAIN_DB_KEY: &str = "PERSISTEDBEACONCHAINPERSISTEDBEA"; #[derive(Encode, Decode)] pub struct PersistedBeaconChain { pub canonical_head: CheckPoint, - pub finalized_head: CheckPoint, // TODO: operations pool. + pub genesis_block_root: Hash256, pub state: BeaconState, } diff --git a/beacon_node/client/src/beacon_chain_types.rs b/beacon_node/client/src/beacon_chain_types.rs index 24ad93ad44..f84a7fdf65 100644 --- a/beacon_node/client/src/beacon_chain_types.rs +++ b/beacon_node/client/src/beacon_chain_types.rs @@ -1,5 +1,5 @@ use beacon_chain::{ - fork_choice::BitwiseLMDGhost, + fork_choice::OptimizedLMDGhost, slot_clock::SystemTimeSlotClock, store::{DiskStore, MemoryStore, Store}, BeaconChain, BeaconChainTypes, @@ -28,7 +28,7 @@ pub struct TestnetMemoryBeaconChainTypes; impl BeaconChainTypes for TestnetMemoryBeaconChainTypes { type Store = MemoryStore; type SlotClock = SystemTimeSlotClock; - type ForkChoice = BitwiseLMDGhost; + type ForkChoice = OptimizedLMDGhost; type EthSpec = LighthouseTestnetEthSpec; } @@ -45,7 +45,7 @@ pub struct TestnetDiskBeaconChainTypes; impl BeaconChainTypes for TestnetDiskBeaconChainTypes { type Store = DiskStore; type SlotClock = SystemTimeSlotClock; - type ForkChoice = BitwiseLMDGhost; + type ForkChoice = OptimizedLMDGhost; type EthSpec = LighthouseTestnetEthSpec; } diff --git a/beacon_node/client/src/lib.rs b/beacon_node/client/src/lib.rs index 4824d40ad2..d29d00ad49 100644 --- a/beacon_node/client/src/lib.rs +++ b/beacon_node/client/src/lib.rs @@ -128,6 +128,7 @@ where executor, network_send, beacon_chain.clone(), + config.db_name.clone(), metrics_registry, &log, )) diff --git a/beacon_node/http_server/src/api.rs b/beacon_node/http_server/src/api.rs index a910808998..2594f9c288 100644 --- a/beacon_node/http_server/src/api.rs +++ b/beacon_node/http_server/src/api.rs @@ -10,6 +10,7 @@ use persistent::Read; use router::Router; use serde_json::json; use std::sync::Arc; +use types::EthSpec; /// Yields a handler for the HTTP API. pub fn build_handler( @@ -64,7 +65,7 @@ fn handle_fork(req: &mut Request) -> IronResult { @@ -24,3 +25,9 @@ pub struct LocalMetricsKey; impl Key for LocalMetricsKey { type Value = LocalMetrics; } + +pub struct DBPathKey; + +impl Key for DBPathKey { + type Value = PathBuf; +} diff --git a/beacon_node/http_server/src/lib.rs b/beacon_node/http_server/src/lib.rs index cc54b6e17b..fb94348265 100644 --- a/beacon_node/http_server/src/lib.rs +++ b/beacon_node/http_server/src/lib.rs @@ -9,6 +9,7 @@ use network::NetworkMessage; use prometheus::Registry; use router::Router; use slog::{info, o, warn}; +use std::path::PathBuf; use std::sync::Arc; use tokio::runtime::TaskExecutor; @@ -32,6 +33,7 @@ impl Default for HttpServerConfig { /// Build the `iron` HTTP server, defining the core routes. pub fn create_iron_http_server( beacon_chain: Arc>, + db_path: PathBuf, metrics_registry: Registry, ) -> Iron { let mut router = Router::new(); @@ -39,7 +41,7 @@ pub fn create_iron_http_server( // A `GET` request to `/metrics` is handled by the `metrics` module. router.get( "/metrics", - metrics::build_handler(beacon_chain.clone(), metrics_registry), + metrics::build_handler(beacon_chain.clone(), db_path, metrics_registry), "metrics", ); @@ -55,6 +57,7 @@ pub fn start_service( executor: &TaskExecutor, _network_chan: crossbeam_channel::Sender, beacon_chain: Arc>, + db_path: PathBuf, metrics_registry: Registry, log: &slog::Logger, ) -> exit_future::Signal { @@ -66,7 +69,7 @@ pub fn start_service( let (shutdown_trigger, wait_for_shutdown) = exit_future::signal(); // Create an `iron` http, without starting it yet. - let iron = create_iron_http_server(beacon_chain, metrics_registry); + let iron = create_iron_http_server(beacon_chain, db_path, metrics_registry); // Create a HTTP server future. // diff --git a/beacon_node/http_server/src/metrics.rs b/beacon_node/http_server/src/metrics.rs index bfa80ec557..1b1ed1f3d4 100644 --- a/beacon_node/http_server/src/metrics.rs +++ b/beacon_node/http_server/src/metrics.rs @@ -1,19 +1,23 @@ use crate::{ - key::{BeaconChainKey, LocalMetricsKey, MetricsRegistryKey}, + key::{BeaconChainKey, DBPathKey, LocalMetricsKey, MetricsRegistryKey}, map_persistent_err_to_500, }; use beacon_chain::{BeaconChain, BeaconChainTypes}; use iron::prelude::*; use iron::{status::Status, Handler, IronResult, Request, Response}; use persistent::Read; -use prometheus::{Encoder, IntGauge, Opts, Registry, TextEncoder}; -use slot_clock::SlotClock; +use prometheus::{Encoder, Registry, TextEncoder}; +use std::path::PathBuf; use std::sync::Arc; -use types::Slot; + +pub use local_metrics::LocalMetrics; + +mod local_metrics; /// Yields a handler for the metrics endpoint. pub fn build_handler( beacon_chain: Arc>, + db_path: PathBuf, metrics_registry: Registry, ) -> impl Handler { let mut chain = Chain::new(handle_metrics::); @@ -24,43 +28,11 @@ pub fn build_handler( chain.link(Read::>::both(beacon_chain)); chain.link(Read::::both(metrics_registry)); chain.link(Read::::both(local_metrics)); + chain.link(Read::::both(db_path)); chain } -pub struct LocalMetrics { - present_slot: IntGauge, - best_slot: IntGauge, - validator_count: IntGauge, -} - -impl LocalMetrics { - pub fn new() -> Result { - Ok(Self { - present_slot: { - let opts = Opts::new("present_slot", "slot_at_time_of_scrape"); - IntGauge::with_opts(opts)? - }, - best_slot: { - let opts = Opts::new("best_slot", "slot_of_block_at_chain_head"); - IntGauge::with_opts(opts)? - }, - validator_count: { - let opts = Opts::new("validator_count", "number_of_validators"); - IntGauge::with_opts(opts)? - }, - }) - } - - pub fn register(&self, registry: &Registry) -> Result<(), prometheus::Error> { - registry.register(Box::new(self.present_slot.clone()))?; - registry.register(Box::new(self.best_slot.clone()))?; - registry.register(Box::new(self.validator_count.clone()))?; - - Ok(()) - } -} - /// Handle a request for Prometheus metrics. /// /// Returns a text string containing all metrics. @@ -77,18 +49,12 @@ fn handle_metrics(req: &mut Request) -> IronResul .get::>() .map_err(map_persistent_err_to_500)?; - let present_slot = beacon_chain - .slot_clock - .present_slot() - .unwrap_or_else(|_| None) - .unwrap_or_else(|| Slot::new(0)); - local_metrics.present_slot.set(present_slot.as_u64() as i64); + let db_path = req + .get::>() + .map_err(map_persistent_err_to_500)?; - let best_slot = beacon_chain.head().beacon_block.slot; - local_metrics.best_slot.set(best_slot.as_u64() as i64); - - let validator_count = beacon_chain.head().beacon_state.validator_registry.len(); - local_metrics.validator_count.set(validator_count as i64); + // Update metrics that are calculated on each scrape. + local_metrics.update(&beacon_chain, &db_path); let mut buffer = vec![]; let encoder = TextEncoder::new(); diff --git a/beacon_node/http_server/src/metrics/local_metrics.rs b/beacon_node/http_server/src/metrics/local_metrics.rs new file mode 100644 index 0000000000..e2b0e6513b --- /dev/null +++ b/beacon_node/http_server/src/metrics/local_metrics.rs @@ -0,0 +1,94 @@ +use beacon_chain::{BeaconChain, BeaconChainTypes}; +use prometheus::{IntGauge, Opts, Registry}; +use slot_clock::SlotClock; +use std::path::PathBuf; +use std::fs::File; +use types::Slot; + +// If set to `true` will iterate and sum the balances of all validators in the state for each +// scrape. +const SHOULD_SUM_VALIDATOR_BALANCES: bool = true; + +pub struct LocalMetrics { + present_slot: IntGauge, + best_slot: IntGauge, + validator_count: IntGauge, + justified_epoch: IntGauge, + finalized_epoch: IntGauge, + validator_balances_sum: IntGauge, + database_size: IntGauge, +} + +impl LocalMetrics { + /// Create a new instance. + pub fn new() -> Result { + Ok(Self { + present_slot: { + let opts = Opts::new("present_slot", "slot_at_time_of_scrape"); + IntGauge::with_opts(opts)? + }, + best_slot: { + let opts = Opts::new("best_slot", "slot_of_block_at_chain_head"); + IntGauge::with_opts(opts)? + }, + validator_count: { + let opts = Opts::new("validator_count", "number_of_validators"); + IntGauge::with_opts(opts)? + }, + justified_epoch: { + let opts = Opts::new("justified_epoch", "state_justified_epoch"); + IntGauge::with_opts(opts)? + }, + finalized_epoch: { + let opts = Opts::new("finalized_epoch", "state_finalized_epoch"); + IntGauge::with_opts(opts)? + }, + validator_balances_sum: { + let opts = Opts::new("validator_balances_sum", "sum_of_all_validator_balances"); + IntGauge::with_opts(opts)? + }, + database_size: { + let opts = Opts::new("database_size", "size_of_on_disk_db_in_mb"); + IntGauge::with_opts(opts)? + }, + }) + } + + /// Registry this instance with the `registry`. + pub fn register(&self, registry: &Registry) -> Result<(), prometheus::Error> { + registry.register(Box::new(self.present_slot.clone()))?; + registry.register(Box::new(self.best_slot.clone()))?; + registry.register(Box::new(self.validator_count.clone()))?; + registry.register(Box::new(self.finalized_epoch.clone()))?; + registry.register(Box::new(self.justified_epoch.clone()))?; + registry.register(Box::new(self.validator_balances_sum.clone()))?; + registry.register(Box::new(self.database_size.clone()))?; + + Ok(()) + } + + /// Update the metrics in `self` to the latest values. + pub fn update(&self, beacon_chain: &BeaconChain, db_path: &PathBuf) { + let state = &beacon_chain.head().beacon_state; + + let present_slot = beacon_chain + .slot_clock + .present_slot() + .unwrap_or_else(|_| None) + .unwrap_or_else(|| Slot::new(0)); + self.present_slot.set(present_slot.as_u64() as i64); + + self.best_slot.set(state.slot.as_u64() as i64); + self.validator_count.set(state.validator_registry.len() as i64); + self.justified_epoch.set(state.current_justified_epoch.as_u64() as i64); + self.finalized_epoch.set(state.finalized_epoch.as_u64() as i64); + if SHOULD_SUM_VALIDATOR_BALANCES { + self.validator_balances_sum.set(state.validator_balances.iter().sum::() as i64); + } + let db_size = File::open(db_path) + .and_then(|f| f.metadata()) + .and_then(|m| Ok(m.len())) + .unwrap_or(0); + self.database_size.set(db_size as i64); + } +} diff --git a/beacon_node/network/src/sync/simple_sync.rs b/beacon_node/network/src/sync/simple_sync.rs index 0ca4f60115..af6cbdfc03 100644 --- a/beacon_node/network/src/sync/simple_sync.rs +++ b/beacon_node/network/src/sync/simple_sync.rs @@ -859,7 +859,7 @@ fn hello_message(beacon_chain: &BeaconChain) -> HelloMes latest_finalized_root: state.finalized_root, latest_finalized_epoch: state.finalized_epoch, best_root: beacon_chain.head().beacon_block_root, - best_slot: beacon_chain.head().beacon_block.slot, + best_slot: state.slot, } } diff --git a/beacon_node/rpc/src/attestation.rs b/beacon_node/rpc/src/attestation.rs index e764e1b1d1..27fcad7cda 100644 --- a/beacon_node/rpc/src/attestation.rs +++ b/beacon_node/rpc/src/attestation.rs @@ -34,7 +34,7 @@ impl AttestationService for AttestationServiceInstance { // verify the slot, drop lock on state afterwards { let slot_requested = req.get_slot(); - let state = &self.chain.head().beacon_state; + let state = &self.chain.current_state(); // Start by performing some checks // Check that the AttestionData is for the current slot (otherwise it will not be valid) diff --git a/beacon_node/rpc/src/validator.rs b/beacon_node/rpc/src/validator.rs index 4ab9588c4d..68ce60b969 100644 --- a/beacon_node/rpc/src/validator.rs +++ b/beacon_node/rpc/src/validator.rs @@ -30,7 +30,7 @@ impl ValidatorService for ValidatorServiceInstance { trace!(self.log, "RPC request"; "endpoint" => "GetValidatorDuties", "epoch" => req.get_epoch()); let spec = T::EthSpec::spec(); - let state = &self.chain.head().beacon_state; + let state = &self.chain.current_state(); let epoch = Epoch::from(req.get_epoch()); let mut resp = GetDutiesResponse::new(); let resp_validators = resp.mut_active_validators(); diff --git a/validator_client/src/error.rs b/validator_client/src/error.rs index 97500f900b..7740c4f2b6 100644 --- a/validator_client/src/error.rs +++ b/validator_client/src/error.rs @@ -1,6 +1,8 @@ use slot_clock; -use error_chain::error_chain; +use error_chain::{ + error_chain +}; error_chain! { links { }