From 7afbaa807ec4fc66a22eb6db515e02123d8f5996 Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Mon, 2 Nov 2020 00:37:30 +0000 Subject: [PATCH] Return eth1-related data via the API (#1797) ## Issue Addressed - Related to #1691 ## Proposed Changes Adds the following API endpoints: - `GET lighthouse/eth1/syncing`: status about how synced we are with Eth1. - `GET lighthouse/eth1/block_cache`: all locally cached eth1 blocks. - `GET lighthouse/eth1/deposit_cache`: all locally cached eth1 deposits. Additionally: - Moves some types from the `beacon_node/eth1` to the `common/eth2` crate, so they can be used in the API without duplication. - Allow `update_deposit_cache` and `update_block_cache` to take an optional head block number to avoid duplicate requests. ## Additional Info TBC --- Cargo.lock | 3 + beacon_node/beacon_chain/Cargo.toml | 1 + beacon_node/beacon_chain/src/eth1_chain.rs | 125 ++++++++++++++++++ beacon_node/client/src/builder.rs | 3 +- beacon_node/eth1/Cargo.toml | 1 + beacon_node/eth1/src/block_cache.rs | 36 ++--- beacon_node/eth1/src/deposit_cache.rs | 2 +- beacon_node/eth1/src/deposit_log.rs | 27 ++-- beacon_node/eth1/src/http.rs | 15 ++- beacon_node/eth1/src/inner.rs | 6 +- beacon_node/eth1/src/service.rs | 119 +++++++++++++---- beacon_node/eth1/tests/test.rs | 54 ++++---- .../genesis/src/eth1_genesis_service.rs | 4 +- beacon_node/http_api/src/lib.rs | 91 +++++++++++++ beacon_node/http_api/tests/tests.rs | 36 +++++ book/src/api-lighthouse.md | 121 +++++++++++++++++ common/eth2/Cargo.toml | 3 +- common/eth2/src/lighthouse.rs | 92 ++++++++++++- 18 files changed, 638 insertions(+), 101 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index bf633c19db..5f1190eb84 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -363,6 +363,7 @@ dependencies = [ "derivative", "environment", "eth1", + "eth2", "eth2_config", "eth2_hashing", "eth2_ssz", @@ -1599,6 +1600,7 @@ version = "0.2.0" dependencies = [ "environment", "eth1_test_rig", + "eth2", "eth2_hashing", "eth2_ssz", "eth2_ssz_derive", @@ -1644,6 +1646,7 @@ dependencies = [ "eth2_keystore", "eth2_libp2p", "eth2_ssz", + "eth2_ssz_derive", "hex", "libsecp256k1", "procinfo", diff --git a/beacon_node/beacon_chain/Cargo.toml b/beacon_node/beacon_chain/Cargo.toml index c4252e793d..c80df28cde 100644 --- a/beacon_node/beacon_chain/Cargo.toml +++ b/beacon_node/beacon_chain/Cargo.toml @@ -61,3 +61,4 @@ derivative = "2.1.1" itertools = "0.9.0" regex = "1.3.9" exit-future = "0.2.0" +eth2 = { path = "../../common/eth2" } diff --git a/beacon_node/beacon_chain/src/eth1_chain.rs b/beacon_node/beacon_chain/src/eth1_chain.rs index ea455064e2..2ade0e5bc6 100644 --- a/beacon_node/beacon_chain/src/eth1_chain.rs +++ b/beacon_node/beacon_chain/src/eth1_chain.rs @@ -1,5 +1,6 @@ use crate::metrics; use eth1::{Config as Eth1Config, Eth1Block, Service as HttpService}; +use eth2::lighthouse::Eth1SyncStatusData; use eth2_hashing::hash; use slog::{debug, error, trace, Logger}; use ssz::{Decode, Encode}; @@ -9,6 +10,7 @@ use std::cmp::Ordering; use std::collections::HashMap; use std::iter::DoubleEndedIterator; use std::marker::PhantomData; +use std::time::{SystemTime, UNIX_EPOCH}; use store::{DBColumn, Error as StoreError, StoreItem}; use task_executor::TaskExecutor; use types::{ @@ -19,6 +21,11 @@ use types::{ type BlockNumber = u64; type Eth1DataVoteCount = HashMap<(Eth1Data, BlockNumber), u64>; +/// We will declare ourself synced with the Eth1 chain, even if we are this many blocks behind. +/// +/// This number (8) was chosen somewhat arbitrarily. +const ETH1_SYNC_TOLERANCE: u64 = 8; + #[derive(Debug)] pub enum Error { /// Unable to return an Eth1Data for the given epoch. @@ -53,6 +60,84 @@ impl From for Error { } } +/// Returns an `Eth1SyncStatusData` given some parameters: +/// +/// - `latest_cached_block`: The latest eth1 block in our cache, if any. +/// - `head_block`: The block at the very head of our eth1 node (ignoring follow distance, etc). +/// - `genesis_time`: beacon chain genesis time. +/// - `current_slot`: current beacon chain slot. +/// - `spec`: current beacon chain specification. +fn get_sync_status( + latest_cached_block: Option<&Eth1Block>, + head_block: Option<&Eth1Block>, + genesis_time: u64, + current_slot: Slot, + spec: &ChainSpec, +) -> Option { + let period = T::SlotsPerEth1VotingPeriod::to_u64(); + // Since `period` is a "constant", we assume it is set sensibly. + let voting_period_start_slot = (current_slot / period) * period; + let voting_period_start_timestamp = { + let period_start = slot_start_seconds::( + genesis_time, + spec.milliseconds_per_slot, + voting_period_start_slot, + ); + let eth1_follow_distance_seconds = spec + .seconds_per_eth1_block + .saturating_mul(spec.eth1_follow_distance); + + period_start.saturating_sub(eth1_follow_distance_seconds) + }; + + let latest_cached_block_number = latest_cached_block.map(|b| b.number); + let latest_cached_block_timestamp = latest_cached_block.map(|b| b.timestamp); + let head_block_number = head_block.map(|b| b.number); + let head_block_timestamp = head_block.map(|b| b.timestamp); + + let eth1_node_sync_status_percentage = if let Some(head_block) = head_block { + let now = SystemTime::now().duration_since(UNIX_EPOCH).ok()?.as_secs(); + let head_age = now.saturating_sub(head_block.timestamp); + + if head_age < ETH1_SYNC_TOLERANCE * spec.seconds_per_eth1_block { + // Always indicate we are fully synced if it's within the sync threshold. + 100.0 + } else { + let blocks_behind = head_age + .checked_div(spec.seconds_per_eth1_block) + .unwrap_or(0); + + let part = f64::from(head_block.number as u32); + let whole = f64::from(head_block.number.saturating_add(blocks_behind) as u32); + + if whole > 0.0 { + (part / whole) * 100.0 + } else { + // Avoids a divide-by-zero. + 0.0 + } + } + } else { + // Always return 0% synced if the head block of the eth1 chain is unknown. + 0.0 + }; + + // Lighthouse is "cached and ready" when it has cached enough blocks to cover the start of the + // current voting period. + let lighthouse_is_cached_and_ready = + latest_cached_block_timestamp.map_or(false, |t| t >= voting_period_start_timestamp); + + Some(Eth1SyncStatusData { + head_block_number, + head_block_timestamp, + latest_cached_block_number, + latest_cached_block_timestamp, + voting_period_start_timestamp, + eth1_node_sync_status_percentage, + lighthouse_is_cached_and_ready, + }) +} + #[derive(Encode, Decode, Clone)] pub struct SszEth1 { use_dummy_backend: bool, @@ -143,6 +228,22 @@ where } } + /// Returns a status indicating how synced our caches are with the eth1 chain. + pub fn sync_status( + &self, + genesis_time: u64, + current_slot: Slot, + spec: &ChainSpec, + ) -> Option { + get_sync_status::( + self.backend.latest_cached_block().as_ref(), + self.backend.head_block().as_ref(), + genesis_time, + current_slot, + spec, + ) + } + /// Instantiate `Eth1Chain` from a persisted `SszEth1`. /// /// The `Eth1Chain` will have the same caches as the persisted `SszEth1`. @@ -195,6 +296,14 @@ pub trait Eth1ChainBackend: Sized + Send + Sync { spec: &ChainSpec, ) -> Result, Error>; + /// Returns the latest block stored in the cache. Used to obtain an idea of how up-to-date the + /// beacon node eth1 cache is. + fn latest_cached_block(&self) -> Option; + + /// Returns the block at the head of the chain (ignoring follow distance, etc). Used to obtain + /// an idea of how up-to-date the remote eth1 node is. + fn head_block(&self) -> Option; + /// Encode the `Eth1ChainBackend` instance to bytes. fn as_bytes(&self) -> Vec; @@ -241,6 +350,14 @@ impl Eth1ChainBackend for DummyEth1ChainBackend { Ok(vec![]) } + fn latest_cached_block(&self) -> Option { + None + } + + fn head_block(&self) -> Option { + None + } + /// Return empty Vec for dummy backend. fn as_bytes(&self) -> Vec { Vec::new() @@ -400,6 +517,14 @@ impl Eth1ChainBackend for CachingEth1Backend { } } + fn latest_cached_block(&self) -> Option { + self.core.latest_cached_block() + } + + fn head_block(&self) -> Option { + self.core.head_block() + } + /// Return encoded byte representation of the block and deposit caches. fn as_bytes(&self) -> Vec { self.core.as_bytes() diff --git a/beacon_node/client/src/builder.rs b/beacon_node/client/src/builder.rs index ec039aeaed..9d0e6c25f6 100644 --- a/beacon_node/client/src/builder.rs +++ b/beacon_node/client/src/builder.rs @@ -334,6 +334,7 @@ where chain: self.beacon_chain.clone(), network_tx: self.network_send.clone(), network_globals: self.network_globals.clone(), + eth1_service: self.eth1_service.clone(), log: log.clone(), }); @@ -590,7 +591,7 @@ where })? }; - self.eth1_service = None; + self.eth1_service = Some(backend.core.clone()); // Starts the service that connects to an eth1 node and periodically updates caches. backend.start(context.executor); diff --git a/beacon_node/eth1/Cargo.toml b/beacon_node/eth1/Cargo.toml index 602c3e90c4..bb7d016705 100644 --- a/beacon_node/eth1/Cargo.toml +++ b/beacon_node/eth1/Cargo.toml @@ -31,3 +31,4 @@ libflate = "1.0.2" lighthouse_metrics = { path = "../../common/lighthouse_metrics"} lazy_static = "1.4.0" task_executor = { path = "../../common/task_executor" } +eth2 = { path = "../../common/eth2" } diff --git a/beacon_node/eth1/src/block_cache.rs b/beacon_node/eth1/src/block_cache.rs index 4bcd23740e..5999944f4a 100644 --- a/beacon_node/eth1/src/block_cache.rs +++ b/beacon_node/eth1/src/block_cache.rs @@ -1,6 +1,7 @@ use ssz_derive::{Decode, Encode}; use std::ops::RangeInclusive; -use types::{Eth1Data, Hash256}; + +pub use eth2::lighthouse::Eth1Block; #[derive(Debug, PartialEq, Clone)] pub enum Error { @@ -15,28 +16,6 @@ pub enum Error { Internal(String), } -/// A block of the eth1 chain. -/// -/// Contains all information required to add a `BlockCache` entry. -#[derive(Debug, PartialEq, Clone, Eq, Hash, Encode, Decode)] -pub struct Eth1Block { - pub hash: Hash256, - pub timestamp: u64, - pub number: u64, - pub deposit_root: Option, - pub deposit_count: Option, -} - -impl Eth1Block { - pub fn eth1_data(self) -> Option { - Some(Eth1Data { - deposit_root: self.deposit_root?, - deposit_count: self.deposit_count?, - block_hash: self.hash, - }) - } -} - /// Stores block and deposit contract information and provides queries based upon the block /// timestamp. #[derive(Debug, PartialEq, Clone, Default, Encode, Decode)] @@ -55,6 +34,16 @@ impl BlockCache { self.blocks.is_empty() } + /// Returns the earliest (lowest timestamp) block, if any. + pub fn earliest_block(&self) -> Option<&Eth1Block> { + self.blocks.first() + } + + /// Returns the latest (highest timestamp) block, if any. + pub fn latest_block(&self) -> Option<&Eth1Block> { + self.blocks.last() + } + /// Returns the timestamp of the earliest block in the cache (if any). pub fn earliest_block_timestamp(&self) -> Option { self.blocks.first().map(|block| block.timestamp) @@ -181,6 +170,7 @@ impl BlockCache { #[cfg(test)] mod tests { use super::*; + use types::Hash256; fn get_block(i: u64, interval_secs: u64) -> Eth1Block { Eth1Block { diff --git a/beacon_node/eth1/src/deposit_cache.rs b/beacon_node/eth1/src/deposit_cache.rs index ca9f0b0c56..b1bb0c9f5f 100644 --- a/beacon_node/eth1/src/deposit_cache.rs +++ b/beacon_node/eth1/src/deposit_cache.rs @@ -304,7 +304,7 @@ pub mod tests { block_number: 42, data: EXAMPLE_LOG.to_vec(), }; - DepositLog::from_log(&log, &spec).expect("should decode log") + log.to_deposit_log(&spec).expect("should decode log") } #[test] diff --git a/beacon_node/eth1/src/deposit_log.rs b/beacon_node/eth1/src/deposit_log.rs index ab5a85a02c..22ead3db85 100644 --- a/beacon_node/eth1/src/deposit_log.rs +++ b/beacon_node/eth1/src/deposit_log.rs @@ -1,11 +1,12 @@ use super::http::Log; use ssz::Decode; -use ssz_derive::{Decode, Encode}; use state_processing::per_block_processing::signature_sets::{ deposit_pubkey_signature_message, deposit_signature_set, }; use types::{ChainSpec, DepositData, Hash256, PublicKeyBytes, SignatureBytes}; +pub use eth2::lighthouse::DepositLog; + /// The following constants define the layout of bytes in the deposit contract `DepositEvent`. The /// event bytes are formatted according to the Ethereum ABI. const PUBKEY_START: usize = 192; @@ -19,22 +20,10 @@ const SIG_LEN: usize = 96; const INDEX_START: usize = SIG_START + 96 + 32; const INDEX_LEN: usize = 8; -/// A fully parsed eth1 deposit contract log. -#[derive(Debug, PartialEq, Clone, Encode, Decode)] -pub struct DepositLog { - pub deposit_data: DepositData, - /// The block number of the log that included this `DepositData`. - pub block_number: u64, - /// The index included with the deposit log. - pub index: u64, - /// True if the signature is valid. - pub signature_is_valid: bool, -} - -impl DepositLog { +impl Log { /// Attempts to parse a raw `Log` from the deposit contract into a `DepositLog`. - pub fn from_log(log: &Log, spec: &ChainSpec) -> Result { - let bytes = &log.data; + pub fn to_deposit_log(&self, spec: &ChainSpec) -> Result { + let bytes = &self.data; let pubkey = bytes .get(PUBKEY_START..PUBKEY_START + PUBKEY_LEN) @@ -68,7 +57,7 @@ impl DepositLog { Ok(DepositLog { deposit_data, - block_number: log.block_number, + block_number: self.block_number, index: u64::from_ssz_bytes(index).map_err(|e| format!("Invalid index ssz: {:?}", e))?, signature_is_valid, }) @@ -77,7 +66,6 @@ impl DepositLog { #[cfg(test)] pub mod tests { - use super::*; use crate::http::Log; use types::{EthSpec, MainnetEthSpec}; @@ -113,6 +101,7 @@ pub mod tests { block_number: 42, data: EXAMPLE_LOG.to_vec(), }; - DepositLog::from_log(&log, &MainnetEthSpec::default_spec()).expect("should decode log"); + log.to_deposit_log(&MainnetEthSpec::default_spec()) + .expect("should decode log"); } } diff --git a/beacon_node/eth1/src/http.rs b/beacon_node/eth1/src/http.rs index e8f7d23a02..9fa14a7b7a 100644 --- a/beacon_node/eth1/src/http.rs +++ b/beacon_node/eth1/src/http.rs @@ -39,6 +39,13 @@ pub enum Eth1NetworkId { Custom(u64), } +/// Used to identify a block when querying the Eth1 node. +#[derive(Clone, Copy)] +pub enum BlockQuery { + Number(u64), + Latest, +} + impl Into for Eth1NetworkId { fn into(self) -> u64 { match self { @@ -107,11 +114,15 @@ pub async fn get_block_number(endpoint: &str, timeout: Duration) -> Result Result { + let query_param = match query { + BlockQuery::Number(block_number) => format!("0x{:x}", block_number), + BlockQuery::Latest => "latest".to_string(), + }; let params = json!([ - format!("0x{:x}", block_number), + query_param, false // do not return full tx objects. ]); diff --git a/beacon_node/eth1/src/inner.rs b/beacon_node/eth1/src/inner.rs index 4121c6c84b..718080a1e5 100644 --- a/beacon_node/eth1/src/inner.rs +++ b/beacon_node/eth1/src/inner.rs @@ -1,6 +1,6 @@ use crate::Config; use crate::{ - block_cache::BlockCache, + block_cache::{BlockCache, Eth1Block}, deposit_cache::{DepositCache, SszDepositCache}, }; use parking_lot::RwLock; @@ -29,6 +29,7 @@ pub struct Inner { pub block_cache: RwLock, pub deposit_cache: RwLock, pub config: RwLock, + pub remote_head_block: RwLock>, pub spec: ChainSpec, } @@ -86,6 +87,9 @@ impl SszEth1Cache { cache: self.deposit_cache.to_deposit_cache()?, last_processed_block: self.last_processed_block, }), + // Set the remote head_block zero when creating a new instance. We only care about + // present and future eth1 nodes. + remote_head_block: RwLock::new(None), config: RwLock::new(config), spec, }) diff --git a/beacon_node/eth1/src/service.rs b/beacon_node/eth1/src/service.rs index e565f03473..47e8c38610 100644 --- a/beacon_node/eth1/src/service.rs +++ b/beacon_node/eth1/src/service.rs @@ -3,10 +3,10 @@ use crate::{ block_cache::{BlockCache, Error as BlockCacheError, Eth1Block}, deposit_cache::Error as DepositCacheError, http::{ - get_block, get_block_number, get_deposit_logs_in_range, get_network_id, Eth1NetworkId, Log, + get_block, get_block_number, get_deposit_logs_in_range, get_network_id, BlockQuery, + Eth1NetworkId, Log, }, inner::{DepositUpdater, Inner}, - DepositLog, }; use futures::{future::TryFutureExt, stream, stream::TryStreamExt, StreamExt}; use parking_lot::{RwLock, RwLockReadGuard}; @@ -148,6 +148,7 @@ impl Service { deposit_cache: RwLock::new(DepositUpdater::new( config.deposit_contract_deploy_block, )), + remote_head_block: RwLock::new(None), config: RwLock::new(config), spec, }), @@ -206,6 +207,21 @@ impl Service { self.inner.block_cache.read().latest_block_timestamp() } + /// Returns the latest head block returned from an Eth1 node. + /// + /// ## Note + /// + /// This is the simply the head of the Eth1 chain, with no regard to follow distance or the + /// voting period start. + pub fn head_block(&self) -> Option { + self.inner.remote_head_block.read().as_ref().cloned() + } + + /// Returns the latest cached block. + pub fn latest_cached_block(&self) -> Option { + self.inner.block_cache.read().latest_block().cloned() + } + /// Returns the lowest block number stored. pub fn lowest_block_number(&self) -> Option { self.inner.block_cache.read().lowest_block_number() @@ -301,9 +317,16 @@ impl Service { pub async fn update( &self, ) -> Result<(DepositCacheUpdateOutcome, BlockCacheUpdateOutcome), String> { + let remote_head_block = download_eth1_block(self.inner.clone(), None) + .map_err(|e| format!("Failed to update Eth1 service: {:?}", e)) + .await?; + let remote_head_block_number = Some(remote_head_block.number); + + *self.inner.remote_head_block.write() = Some(remote_head_block); + let update_deposit_cache = async { let outcome = self - .update_deposit_cache() + .update_deposit_cache(remote_head_block_number) .await .map_err(|e| format!("Failed to update eth1 cache: {:?}", e))?; @@ -314,12 +337,12 @@ impl Service { "logs_imported" => outcome.logs_imported, "last_processed_eth1_block" => self.inner.deposit_cache.read().last_processed_block, ); - Ok(outcome) + Ok::<_, String>(outcome) }; let update_block_cache = async { let outcome = self - .update_block_cache() + .update_block_cache(remote_head_block_number) .await .map_err(|e| format!("Failed to update eth1 cache: {:?}", e))?; @@ -330,10 +353,13 @@ impl Service { "blocks_imported" => outcome.blocks_imported, "head_block" => outcome.head_block_number, ); - Ok(outcome) + Ok::<_, String>(outcome) }; - futures::try_join!(update_deposit_cache, update_block_cache) + let (deposit_outcome, block_outcome) = + futures::try_join!(update_deposit_cache, update_block_cache)?; + + Ok((deposit_outcome, block_outcome)) } /// A looping future that updates the cache, then waits `config.auto_update_interval` before @@ -413,13 +439,19 @@ impl Service { /// Will process no more than `BLOCKS_PER_LOG_QUERY * MAX_LOG_REQUESTS_PER_UPDATE` blocks in a /// single update. /// + /// If `remote_highest_block_opt` is `Some`, use that value instead of querying `self.endpoint` + /// for the head of the eth1 chain. + /// /// ## Resolves with /// /// - Ok(_) if the update was successful (the cache may or may not have been modified). /// - Err(_) if there is an error. /// /// Emits logs for debugging and errors. - pub async fn update_deposit_cache(&self) -> Result { + pub async fn update_deposit_cache( + &self, + remote_highest_block_opt: Option, + ) -> Result { let endpoint = self.config().endpoint.clone(); let follow_distance = self.config().follow_distance; let deposit_contract_address = self.config().deposit_contract_address.clone(); @@ -437,7 +469,13 @@ impl Service { .map(|n| n + 1) .unwrap_or_else(|| self.config().deposit_contract_deploy_block); - let range = get_new_block_numbers(&endpoint, next_required_block, follow_distance).await?; + let range = get_new_block_numbers( + &endpoint, + remote_highest_block_opt, + next_required_block, + follow_distance, + ) + .await?; let block_number_chunks = if let Some(range) = range { range @@ -483,7 +521,7 @@ impl Service { log_chunk .iter() .map(|raw_log| { - DepositLog::from_log(&raw_log, self.inner.spec()).map_err(|error| { + raw_log.to_deposit_log(self.inner.spec()).map_err(|error| { Error::FailedToParseDepositLog { block_range: block_range.clone(), error, @@ -548,13 +586,19 @@ impl Service { /// /// If configured, prunes the block cache after importing new blocks. /// + /// If `remote_highest_block_opt` is `Some`, use that value instead of querying `self.endpoint` + /// for the head of the eth1 chain. + /// /// ## Resolves with /// /// - Ok(_) if the update was successful (the cache may or may not have been modified). /// - Err(_) if there is an error. /// /// Emits logs for debugging and errors. - pub async fn update_block_cache(&self) -> Result { + pub async fn update_block_cache( + &self, + remote_highest_block_opt: Option, + ) -> Result { let block_cache_truncation = self.config().block_cache_truncation; let max_blocks_per_update = self .config() @@ -572,7 +616,13 @@ impl Service { let endpoint = self.config().endpoint.clone(); let follow_distance = self.config().follow_distance; - let range = get_new_block_numbers(&endpoint, next_required_block, follow_distance).await?; + let range = get_new_block_numbers( + &endpoint, + remote_highest_block_opt, + next_required_block, + follow_distance, + ) + .await?; // Map the range of required blocks into a Vec. // // If the required range is larger than the size of the cache, drop the exiting cache @@ -623,7 +673,7 @@ impl Service { |mut block_numbers| async { match block_numbers.next() { Some(block_number) => { - match download_eth1_block(self.inner.clone(), block_number).await { + match download_eth1_block(self.inner.clone(), Some(block_number)).await { Ok(eth1_block) => Ok(Some((eth1_block, block_numbers))), Err(e) => Err(e), } @@ -708,13 +758,17 @@ impl Service { /// the locally stored best block. async fn get_new_block_numbers<'a>( endpoint: &str, + remote_highest_block_opt: Option, next_required_block: u64, follow_distance: u64, ) -> Result>, Error> { - let remote_highest_block = + let remote_highest_block = if let Some(block_number) = remote_highest_block_opt { + block_number + } else { get_block_number(endpoint, Duration::from_millis(BLOCK_NUMBER_TIMEOUT_MILLIS)) .map_err(Error::GetBlockNumberFailed) - .await?; + .await? + }; let remote_follow_block = remote_highest_block.saturating_sub(follow_distance); if next_required_block <= remote_follow_block { @@ -739,26 +793,37 @@ async fn get_new_block_numbers<'a>( /// Downloads the `(block, deposit_root, deposit_count)` tuple from an eth1 node for the given /// `block_number`. /// +/// Set `block_number_opt = None` to get the "latest" eth1 block (i.e., the head). +/// /// Performs three async calls to an Eth1 HTTP JSON RPC endpoint. -async fn download_eth1_block(cache: Arc, block_number: u64) -> Result { +async fn download_eth1_block( + cache: Arc, + block_number_opt: Option, +) -> Result { let endpoint = cache.config.read().endpoint.clone(); - let deposit_root = cache - .deposit_cache - .read() - .cache - .get_deposit_root_from_cache(block_number); + let deposit_root = block_number_opt.and_then(|block_number| { + cache + .deposit_cache + .read() + .cache + .get_deposit_root_from_cache(block_number) + }); - let deposit_count = cache - .deposit_cache - .read() - .cache - .get_deposit_count_from_cache(block_number); + let deposit_count = block_number_opt.and_then(|block_number| { + cache + .deposit_cache + .read() + .cache + .get_deposit_count_from_cache(block_number) + }); // Performs a `get_blockByNumber` call to an eth1 node. let http_block = get_block( &endpoint, - block_number, + block_number_opt + .map(BlockQuery::Number) + .unwrap_or_else(|| BlockQuery::Latest), Duration::from_millis(GET_BLOCK_TIMEOUT_MILLIS), ) .map_err(Error::BlockDownloadFailed) diff --git a/beacon_node/eth1/tests/test.rs b/beacon_node/eth1/tests/test.rs index fa11aca46d..399890213f 100644 --- a/beacon_node/eth1/tests/test.rs +++ b/beacon_node/eth1/tests/test.rs @@ -1,8 +1,8 @@ #![cfg(test)] use environment::{Environment, EnvironmentBuilder}; use eth1::http::{get_deposit_count, get_deposit_logs_in_range, get_deposit_root, Block, Log}; +use eth1::DepositCache; use eth1::{Config, Service}; -use eth1::{DepositCache, DepositLog}; use eth1_test_rig::GanacheEth1Instance; use futures::compat::Future01CompatExt; use merkle_proof::verify_merkle_proof; @@ -146,16 +146,16 @@ mod eth1_cache { } service - .update_deposit_cache() + .update_deposit_cache(None) .await .expect("should update deposit cache"); service - .update_block_cache() + .update_block_cache(None) .await .expect("should update block cache"); service - .update_block_cache() + .update_block_cache(None) .await .expect("should update cache when nothing has changed"); @@ -209,11 +209,11 @@ mod eth1_cache { } service - .update_deposit_cache() + .update_deposit_cache(None) .await .expect("should update deposit cache"); service - .update_block_cache() + .update_block_cache(None) .await .expect("should update block cache"); @@ -256,11 +256,11 @@ mod eth1_cache { eth1.ganache.evm_mine().await.expect("should mine block") } service - .update_deposit_cache() + .update_deposit_cache(None) .await .expect("should update deposit cache"); service - .update_block_cache() + .update_block_cache(None) .await .expect("should update block cache"); } @@ -300,12 +300,15 @@ mod eth1_cache { eth1.ganache.evm_mine().await.expect("should mine block") } futures::try_join!( - service.update_deposit_cache(), - service.update_deposit_cache() + service.update_deposit_cache(None), + service.update_deposit_cache(None) ) .expect("should perform two simultaneous updates of deposit cache"); - futures::try_join!(service.update_block_cache(), service.update_block_cache()) - .expect("should perform two simultaneous updates of block cache"); + futures::try_join!( + service.update_block_cache(None), + service.update_block_cache(None) + ) + .expect("should perform two simultaneous updates of block cache"); assert!(service.block_cache_len() >= n, "should grow the cache"); } @@ -351,12 +354,12 @@ mod deposit_tree { } service - .update_deposit_cache() + .update_deposit_cache(None) .await .expect("should perform update"); service - .update_deposit_cache() + .update_deposit_cache(None) .await .expect("should perform update when nothing has changed"); @@ -426,8 +429,8 @@ mod deposit_tree { } futures::try_join!( - service.update_deposit_cache(), - service.update_deposit_cache() + service.update_deposit_cache(None), + service.update_deposit_cache(None) ) .expect("should perform two updates concurrently"); @@ -477,7 +480,7 @@ mod deposit_tree { let logs: Vec<_> = blocking_deposit_logs(ð1, 0..block_number) .await .iter() - .map(|raw| DepositLog::from_log(raw, spec).expect("should parse deposit log")) + .map(|raw| raw.to_deposit_log(spec).expect("should parse deposit log")) .inspect(|log| { tree.insert_log(log.clone()) .expect("should add consecutive logs") @@ -535,11 +538,16 @@ mod deposit_tree { /// Tests for the base HTTP requests and response handlers. mod http { use super::*; + use eth1::http::BlockQuery; async fn get_block(eth1: &GanacheEth1Instance, block_number: u64) -> Block { - eth1::http::get_block(ð1.endpoint(), block_number, timeout()) - .await - .expect("should get block number") + eth1::http::get_block( + ð1.endpoint(), + BlockQuery::Number(block_number), + timeout(), + ) + .await + .expect("should get block number") } #[tokio::test] @@ -668,7 +676,7 @@ mod fast { } service - .update_deposit_cache() + .update_deposit_cache(None) .await .expect("should perform update"); @@ -736,7 +744,7 @@ mod persist { } service - .update_deposit_cache() + .update_deposit_cache(None) .await .expect("should perform update"); @@ -748,7 +756,7 @@ mod persist { let deposit_count = service.deposit_cache_len(); service - .update_block_cache() + .update_block_cache(None) .await .expect("should perform update"); diff --git a/beacon_node/genesis/src/eth1_genesis_service.rs b/beacon_node/genesis/src/eth1_genesis_service.rs index 4aa60da7a6..e16e577af7 100644 --- a/beacon_node/genesis/src/eth1_genesis_service.rs +++ b/beacon_node/genesis/src/eth1_genesis_service.rs @@ -114,7 +114,7 @@ impl Eth1GenesisService { loop { let update_result = eth1_service - .update_deposit_cache() + .update_deposit_cache(None) .await .map_err(|e| format!("{:?}", e)); @@ -156,7 +156,7 @@ impl Eth1GenesisService { } // Download new eth1 blocks into the cache. - let blocks_imported = match eth1_service.update_block_cache().await { + let blocks_imported = match eth1_service.update_block_cache(None).await { Ok(outcome) => { debug!( log, diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index cb74e66f49..52f6685cf0 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -63,6 +63,7 @@ pub struct Context { pub chain: Option>>, pub network_tx: Option>>, pub network_globals: Option>>, + pub eth1_service: Option, pub log: Logger, } @@ -300,6 +301,19 @@ pub fn serve( } }); + // Create a `warp` filter that provides access to the Eth1 service. + let inner_ctx = ctx.clone(); + let eth1_service_filter = warp::any() + .map(move || inner_ctx.eth1_service.clone()) + .and_then(|eth1_service| async move { + match eth1_service { + Some(eth1_service) => Ok(eth1_service), + None => Err(warp_utils::reject::custom_not_found( + "The Eth1 service is not started. Use --eth1 on the CLI.".to_string(), + )), + } + }); + // Create a `warp` filter that rejects request whilst the node is syncing. let not_while_syncing_filter = warp::any() .and(network_globals.clone()) @@ -1806,6 +1820,80 @@ pub fn serve( }) }); + // GET lighthouse/eth1/syncing + let get_lighthouse_eth1_syncing = warp::path("lighthouse") + .and(warp::path("eth1")) + .and(warp::path("syncing")) + .and(warp::path::end()) + .and(chain_filter.clone()) + .and_then(|chain: Arc>| { + blocking_json_task(move || { + let head_info = chain + .head_info() + .map_err(warp_utils::reject::beacon_chain_error)?; + let current_slot = chain + .slot() + .map_err(warp_utils::reject::beacon_chain_error)?; + + chain + .eth1_chain + .as_ref() + .ok_or_else(|| { + warp_utils::reject::custom_not_found( + "Eth1 sync is disabled. See the --eth1 CLI flag.".to_string(), + ) + }) + .and_then(|eth1| { + eth1.sync_status(head_info.genesis_time, current_slot, &chain.spec) + .ok_or_else(|| { + warp_utils::reject::custom_server_error( + "Unable to determine Eth1 sync status".to_string(), + ) + }) + }) + .map(api_types::GenericResponse::from) + }) + }); + + // GET lighthouse/eth1/block_cache + let get_lighthouse_eth1_block_cache = warp::path("lighthouse") + .and(warp::path("eth1")) + .and(warp::path("block_cache")) + .and(warp::path::end()) + .and(eth1_service_filter.clone()) + .and_then(|eth1_service: eth1::Service| { + blocking_json_task(move || { + Ok(api_types::GenericResponse::from( + eth1_service + .blocks() + .read() + .iter() + .cloned() + .collect::>(), + )) + }) + }); + + // GET lighthouse/eth1/deposit_cache + let get_lighthouse_eth1_deposit_cache = warp::path("lighthouse") + .and(warp::path("eth1")) + .and(warp::path("deposit_cache")) + .and(warp::path::end()) + .and(eth1_service_filter) + .and_then(|eth1_service: eth1::Service| { + blocking_json_task(move || { + Ok(api_types::GenericResponse::from( + eth1_service + .deposits() + .read() + .cache + .iter() + .cloned() + .collect::>(), + )) + }) + }); + // GET lighthouse/beacon/states/{state_id}/ssz let get_lighthouse_beacon_states_ssz = warp::path("lighthouse") .and(warp::path("beacon")) @@ -1872,6 +1960,9 @@ pub fn serve( .or(get_lighthouse_proto_array.boxed()) .or(get_lighthouse_validator_inclusion_global.boxed()) .or(get_lighthouse_validator_inclusion.boxed()) + .or(get_lighthouse_eth1_syncing.boxed()) + .or(get_lighthouse_eth1_block_cache.boxed()) + .or(get_lighthouse_eth1_deposit_cache.boxed()) .or(get_lighthouse_beacon_states_ssz.boxed()) .boxed(), ) diff --git a/beacon_node/http_api/tests/tests.rs b/beacon_node/http_api/tests/tests.rs index d7bbcfe48e..532348a2c3 100644 --- a/beacon_node/http_api/tests/tests.rs +++ b/beacon_node/http_api/tests/tests.rs @@ -169,6 +169,9 @@ impl ApiTester { *network_globals.sync_state.write() = SyncState::Synced; + let eth1_service = + eth1::Service::new(eth1::Config::default(), log.clone(), chain.spec.clone()); + let context = Arc::new(Context { config: Config { enabled: true, @@ -179,6 +182,7 @@ impl ApiTester { chain: Some(chain.clone()), network_tx: Some(network_tx), network_globals: Some(Arc::new(network_globals)), + eth1_service: Some(eth1_service), log, }); let ctx = context.clone(); @@ -1643,6 +1647,32 @@ impl ApiTester { self } + pub async fn test_get_lighthouse_eth1_syncing(self) -> Self { + self.client.get_lighthouse_eth1_syncing().await.unwrap(); + + self + } + + pub async fn test_get_lighthouse_eth1_block_cache(self) -> Self { + let blocks = self.client.get_lighthouse_eth1_block_cache().await.unwrap(); + + assert!(blocks.data.is_empty()); + + self + } + + pub async fn test_get_lighthouse_eth1_deposit_cache(self) -> Self { + let deposits = self + .client + .get_lighthouse_eth1_deposit_cache() + .await + .unwrap(); + + assert!(deposits.data.is_empty()); + + self + } + pub async fn test_get_lighthouse_beacon_states_ssz(self) -> Self { for state_id in self.interesting_state_ids() { let result = self @@ -1920,6 +1950,12 @@ async fn lighthouse_endpoints() { .await .test_get_lighthouse_validator_inclusion_global() .await + .test_get_lighthouse_eth1_syncing() + .await + .test_get_lighthouse_eth1_block_cache() + .await + .test_get_lighthouse_eth1_deposit_cache() + .await .test_get_lighthouse_beacon_states_ssz() .await; } diff --git a/book/src/api-lighthouse.md b/book/src/api-lighthouse.md index 2db412f75d..d93e8d2d7c 100644 --- a/book/src/api-lighthouse.md +++ b/book/src/api-lighthouse.md @@ -178,6 +178,127 @@ See [Validator Inclusion APIs](./validator-inclusion.md). See [Validator Inclusion APIs](./validator-inclusion.md). +### `/lighthouse/eth1/syncing` + +Returns information regarding the Eth1 network, as it is required for use in +Eth2 + +#### Fields + +- `head_block_number`, `head_block_timestamp`: the block number and timestamp +from the very head of the Eth1 chain. Useful for understanding the immediate +health of the Eth1 node that the beacon node is connected to. +- `latest_cached_block_number` & `latest_cached_block_timestamp`: the block +number and timestamp of the latest block we have in our block cache. + - For correct Eth1 voting this timestamp should be later than the +`voting_period_start_timestamp`. +- `voting_period_start_timestamp`: the start of the period where block + producers must include votes for blocks in the Eth1 chain. Provided for + reference. +- `eth1_node_sync_status_percentage` (float): An estimate of how far the head of the + Eth1 node is from the head of the Eth1 chain. + - `100.0` indicates a fully synced Eth1 node. + - `0.0` indicates an Eth1 node that has not verified any blocks past the + genesis block. +- `lighthouse_is_cached_and_ready`: Is set to `true` if the caches in the + beacon node are ready for block production. + - This value might be set to + `false` whilst `eth1_node_sync_status_percentage == 100.0` if the beacon + node is still building its internal cache. + - This value might be set to `true` whilst + `eth1_node_sync_status_percentage < 100.0` since the cache only cares + about blocks a certain distance behind the head. + +#### Example + +```bash +curl -X GET "http://localhost:5052/lighthouse/eth1/syncing" -H "accept: application/json" | jq +``` + +```json +{ + "data": { + "head_block_number": 3611806, + "head_block_timestamp": 1603249317, + "latest_cached_block_number": 3610758, + "latest_cached_block_timestamp": 1603233597, + "voting_period_start_timestamp": 1603228632, + "eth1_node_sync_status_percentage": 100, + "lighthouse_is_cached_and_ready": true + } +} +``` + +### `/lighthouse/eth1/block_cache` + +Returns a list of all the Eth1 blocks in the Eth1 voting cache. + +#### Example + +```bash +curl -X GET "http://localhost:5052/lighthouse/eth1/block_cache" -H "accept: application/json" | jq +``` + +```json +{ + "data": [ + { + "hash": "0x3a17f4b7ae4ee57ef793c49ebc9c06ff85207a5e15a1d0bd37b68c5ef5710d7f", + "timestamp": 1603173338, + "number": 3606741, + "deposit_root": "0xd24920d936e8fb9b67e93fd126ce1d9e14058b6d82dcf7d35aea46879fae6dee", + "deposit_count": 88911 + }, + { + "hash": "0x78852954ea4904e5f81038f175b2adefbede74fbb2338212964405443431c1e7", + "timestamp": 1603173353, + "number": 3606742, + "deposit_root": "0xd24920d936e8fb9b67e93fd126ce1d9e14058b6d82dcf7d35aea46879fae6dee", + "deposit_count": 88911 + } + ] +} +``` + +### `/lighthouse/eth1/deposit_cache` + +Returns a list of all cached logs from the deposit contract. + +#### Example + +```bash +curl -X GET "http://localhost:5052/lighthouse/eth1/deposit_cache" -H "accept: application/json" | jq +``` + +```json +{ + "data": [ + { + "deposit_data": { + "pubkey": "0xae9e6a550ac71490cdf134533b1688fcbdb16f113d7190eacf4f2e9ca6e013d5bd08c37cb2bde9bbdec8ffb8edbd495b", + "withdrawal_credentials": "0x0062a90ebe71c4c01c4e057d7d13b944d9705f524ebfa24290c22477ab0517e4", + "amount": "32000000000", + "signature": "0xa87a4874d276982c471e981a113f8af74a31ffa7d18898a02df2419de2a7f02084065784aa2f743d9ddf80952986ea0b012190cd866f1f2d9c633a7a33c2725d0b181906d413c82e2c18323154a2f7c7ae6f72686782ed9e423070daa00db05b" + }, + "block_number": 3086571, + "index": 0, + "signature_is_valid": false + }, + { + "deposit_data": { + "pubkey": "0xb1d0ec8f907e023ea7b8cb1236be8a74d02ba3f13aba162da4a68e9ffa2e395134658d150ef884bcfaeecdf35c286496", + "withdrawal_credentials": "0x00a6aa2a632a6c4847cf87ef96d789058eb65bfaa4cc4e0ebc39237421c22e54", + "amount": "32000000000", + "signature": "0x8d0f8ec11935010202d6dde9ab437f8d835b9cfd5052c001be5af9304f650ada90c5363022e1f9ef2392dd222cfe55b40dfd52578468d2b2092588d4ad3745775ea4d8199216f3f90e57c9435c501946c030f7bfc8dbd715a55effa6674fd5a4" + }, + "block_number": 3086579, + "index": 1, + "signature_is_valid": false + } + ] +} +``` + ### `/lighthouse/beacon/states/{state_id}/ssz` Obtains a `BeaconState` in SSZ bytes. Useful for obtaining a genesis state. diff --git a/common/eth2/Cargo.toml b/common/eth2/Cargo.toml index b89c5d7c06..9bd37a9b1c 100644 --- a/common/eth2/Cargo.toml +++ b/common/eth2/Cargo.toml @@ -21,7 +21,8 @@ libsecp256k1 = "0.3.5" ring = "0.16.12" bytes = "0.5.6" account_utils = { path = "../../common/account_utils" } -eth2_ssz = { path = "../../consensus/ssz" } +eth2_ssz = "0.1.2" +eth2_ssz_derive = "0.1.0" [target.'cfg(target_os = "linux")'.dependencies] psutil = { version = "3.2.0", optional = true } diff --git a/common/eth2/src/lighthouse.rs b/common/eth2/src/lighthouse.rs index 9735c75054..6ce4cbc76f 100644 --- a/common/eth2/src/lighthouse.rs +++ b/common/eth2/src/lighthouse.rs @@ -3,12 +3,13 @@ use crate::{ ok_or_error, types::{BeaconState, Epoch, EthSpec, GenericResponse, ValidatorId}, - BeaconNodeHttpClient, Error, StateId, StatusCode, + BeaconNodeHttpClient, DepositData, Error, Eth1Data, Hash256, StateId, StatusCode, }; use proto_array::core::ProtoArray; use reqwest::IntoUrl; use serde::{Deserialize, Serialize}; use ssz::Decode; +use ssz_derive::{Decode, Encode}; pub use eth2_libp2p::{types::SyncState, PeerInfo}; @@ -145,6 +146,50 @@ impl Health { } } +/// Indicates how up-to-date the Eth1 caches are. +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] +pub struct Eth1SyncStatusData { + pub head_block_number: Option, + pub head_block_timestamp: Option, + pub latest_cached_block_number: Option, + pub latest_cached_block_timestamp: Option, + pub voting_period_start_timestamp: u64, + pub eth1_node_sync_status_percentage: f64, + pub lighthouse_is_cached_and_ready: bool, +} + +/// A fully parsed eth1 deposit contract log. +#[derive(Debug, PartialEq, Clone, Serialize, Deserialize, Encode, Decode)] +pub struct DepositLog { + pub deposit_data: DepositData, + /// The block number of the log that included this `DepositData`. + pub block_number: u64, + /// The index included with the deposit log. + pub index: u64, + /// True if the signature is valid. + pub signature_is_valid: bool, +} + +/// A block of the eth1 chain. +#[derive(Debug, PartialEq, Clone, Serialize, Deserialize, Encode, Decode)] +pub struct Eth1Block { + pub hash: Hash256, + pub timestamp: u64, + pub number: u64, + pub deposit_root: Option, + pub deposit_count: Option, +} + +impl Eth1Block { + pub fn eth1_data(self) -> Option { + Some(Eth1Data { + deposit_root: self.deposit_root?, + deposit_count: self.deposit_count?, + block_hash: self.hash, + }) + } +} + impl BeaconNodeHttpClient { /// Perform a HTTP GET request, returning `None` on a 404 error. async fn get_bytes_opt(&self, url: U) -> Result>, Error> { @@ -246,6 +291,51 @@ impl BeaconNodeHttpClient { self.get(path).await } + /// `GET lighthouse/eth1/syncing` + pub async fn get_lighthouse_eth1_syncing( + &self, + ) -> Result, Error> { + let mut path = self.server.clone(); + + path.path_segments_mut() + .map_err(|()| Error::InvalidUrl(self.server.clone()))? + .push("lighthouse") + .push("eth1") + .push("syncing"); + + self.get(path).await + } + + /// `GET lighthouse/eth1/block_cache` + pub async fn get_lighthouse_eth1_block_cache( + &self, + ) -> Result>, Error> { + let mut path = self.server.clone(); + + path.path_segments_mut() + .map_err(|()| Error::InvalidUrl(self.server.clone()))? + .push("lighthouse") + .push("eth1") + .push("block_cache"); + + self.get(path).await + } + + /// `GET lighthouse/eth1/deposit_cache` + pub async fn get_lighthouse_eth1_deposit_cache( + &self, + ) -> Result>, Error> { + let mut path = self.server.clone(); + + path.path_segments_mut() + .map_err(|()| Error::InvalidUrl(self.server.clone()))? + .push("lighthouse") + .push("eth1") + .push("deposit_cache"); + + self.get(path).await + } + /// `GET lighthouse/beacon/states/{state_id}/ssz` pub async fn get_lighthouse_beacon_states_ssz( &self,