Return eth1-related data via the API (#1797)

## Issue Addressed

- Related to #1691

## Proposed Changes

Adds the following API endpoints:

- `GET lighthouse/eth1/syncing`: status about how synced we are with Eth1.
- `GET lighthouse/eth1/block_cache`: all locally cached eth1 blocks.
- `GET lighthouse/eth1/deposit_cache`: all locally cached eth1 deposits.

Additionally:

- Moves some types from the `beacon_node/eth1` to the `common/eth2` crate, so they can be used in the API without duplication.
- Allow `update_deposit_cache` and `update_block_cache` to take an optional head block number to avoid duplicate requests.

## Additional Info

TBC
This commit is contained in:
Paul Hauner
2020-11-02 00:37:30 +00:00
parent 6c0c050fbb
commit 7afbaa807e
18 changed files with 638 additions and 101 deletions

View File

@@ -1,6 +1,7 @@
use ssz_derive::{Decode, Encode};
use std::ops::RangeInclusive;
use types::{Eth1Data, Hash256};
pub use eth2::lighthouse::Eth1Block;
#[derive(Debug, PartialEq, Clone)]
pub enum Error {
@@ -15,28 +16,6 @@ pub enum Error {
Internal(String),
}
/// A block of the eth1 chain.
///
/// Contains all information required to add a `BlockCache` entry.
#[derive(Debug, PartialEq, Clone, Eq, Hash, Encode, Decode)]
pub struct Eth1Block {
pub hash: Hash256,
pub timestamp: u64,
pub number: u64,
pub deposit_root: Option<Hash256>,
pub deposit_count: Option<u64>,
}
impl Eth1Block {
pub fn eth1_data(self) -> Option<Eth1Data> {
Some(Eth1Data {
deposit_root: self.deposit_root?,
deposit_count: self.deposit_count?,
block_hash: self.hash,
})
}
}
/// Stores block and deposit contract information and provides queries based upon the block
/// timestamp.
#[derive(Debug, PartialEq, Clone, Default, Encode, Decode)]
@@ -55,6 +34,16 @@ impl BlockCache {
self.blocks.is_empty()
}
/// Returns the earliest (lowest timestamp) block, if any.
pub fn earliest_block(&self) -> Option<&Eth1Block> {
self.blocks.first()
}
/// Returns the latest (highest timestamp) block, if any.
pub fn latest_block(&self) -> Option<&Eth1Block> {
self.blocks.last()
}
/// Returns the timestamp of the earliest block in the cache (if any).
pub fn earliest_block_timestamp(&self) -> Option<u64> {
self.blocks.first().map(|block| block.timestamp)
@@ -181,6 +170,7 @@ impl BlockCache {
#[cfg(test)]
mod tests {
use super::*;
use types::Hash256;
fn get_block(i: u64, interval_secs: u64) -> Eth1Block {
Eth1Block {

View File

@@ -304,7 +304,7 @@ pub mod tests {
block_number: 42,
data: EXAMPLE_LOG.to_vec(),
};
DepositLog::from_log(&log, &spec).expect("should decode log")
log.to_deposit_log(&spec).expect("should decode log")
}
#[test]

View File

@@ -1,11 +1,12 @@
use super::http::Log;
use ssz::Decode;
use ssz_derive::{Decode, Encode};
use state_processing::per_block_processing::signature_sets::{
deposit_pubkey_signature_message, deposit_signature_set,
};
use types::{ChainSpec, DepositData, Hash256, PublicKeyBytes, SignatureBytes};
pub use eth2::lighthouse::DepositLog;
/// The following constants define the layout of bytes in the deposit contract `DepositEvent`. The
/// event bytes are formatted according to the Ethereum ABI.
const PUBKEY_START: usize = 192;
@@ -19,22 +20,10 @@ const SIG_LEN: usize = 96;
const INDEX_START: usize = SIG_START + 96 + 32;
const INDEX_LEN: usize = 8;
/// A fully parsed eth1 deposit contract log.
#[derive(Debug, PartialEq, Clone, Encode, Decode)]
pub struct DepositLog {
pub deposit_data: DepositData,
/// The block number of the log that included this `DepositData`.
pub block_number: u64,
/// The index included with the deposit log.
pub index: u64,
/// True if the signature is valid.
pub signature_is_valid: bool,
}
impl DepositLog {
impl Log {
/// Attempts to parse a raw `Log` from the deposit contract into a `DepositLog`.
pub fn from_log(log: &Log, spec: &ChainSpec) -> Result<Self, String> {
let bytes = &log.data;
pub fn to_deposit_log(&self, spec: &ChainSpec) -> Result<DepositLog, String> {
let bytes = &self.data;
let pubkey = bytes
.get(PUBKEY_START..PUBKEY_START + PUBKEY_LEN)
@@ -68,7 +57,7 @@ impl DepositLog {
Ok(DepositLog {
deposit_data,
block_number: log.block_number,
block_number: self.block_number,
index: u64::from_ssz_bytes(index).map_err(|e| format!("Invalid index ssz: {:?}", e))?,
signature_is_valid,
})
@@ -77,7 +66,6 @@ impl DepositLog {
#[cfg(test)]
pub mod tests {
use super::*;
use crate::http::Log;
use types::{EthSpec, MainnetEthSpec};
@@ -113,6 +101,7 @@ pub mod tests {
block_number: 42,
data: EXAMPLE_LOG.to_vec(),
};
DepositLog::from_log(&log, &MainnetEthSpec::default_spec()).expect("should decode log");
log.to_deposit_log(&MainnetEthSpec::default_spec())
.expect("should decode log");
}
}

View File

@@ -39,6 +39,13 @@ pub enum Eth1NetworkId {
Custom(u64),
}
/// Used to identify a block when querying the Eth1 node.
#[derive(Clone, Copy)]
pub enum BlockQuery {
Number(u64),
Latest,
}
impl Into<u64> for Eth1NetworkId {
fn into(self) -> u64 {
match self {
@@ -107,11 +114,15 @@ pub async fn get_block_number(endpoint: &str, timeout: Duration) -> Result<u64,
/// Uses HTTP JSON RPC at `endpoint`. E.g., `http://localhost:8545`.
pub async fn get_block(
endpoint: &str,
block_number: u64,
query: BlockQuery,
timeout: Duration,
) -> Result<Block, String> {
let query_param = match query {
BlockQuery::Number(block_number) => format!("0x{:x}", block_number),
BlockQuery::Latest => "latest".to_string(),
};
let params = json!([
format!("0x{:x}", block_number),
query_param,
false // do not return full tx objects.
]);

View File

@@ -1,6 +1,6 @@
use crate::Config;
use crate::{
block_cache::BlockCache,
block_cache::{BlockCache, Eth1Block},
deposit_cache::{DepositCache, SszDepositCache},
};
use parking_lot::RwLock;
@@ -29,6 +29,7 @@ pub struct Inner {
pub block_cache: RwLock<BlockCache>,
pub deposit_cache: RwLock<DepositUpdater>,
pub config: RwLock<Config>,
pub remote_head_block: RwLock<Option<Eth1Block>>,
pub spec: ChainSpec,
}
@@ -86,6 +87,9 @@ impl SszEth1Cache {
cache: self.deposit_cache.to_deposit_cache()?,
last_processed_block: self.last_processed_block,
}),
// Set the remote head_block zero when creating a new instance. We only care about
// present and future eth1 nodes.
remote_head_block: RwLock::new(None),
config: RwLock::new(config),
spec,
})

View File

@@ -3,10 +3,10 @@ use crate::{
block_cache::{BlockCache, Error as BlockCacheError, Eth1Block},
deposit_cache::Error as DepositCacheError,
http::{
get_block, get_block_number, get_deposit_logs_in_range, get_network_id, Eth1NetworkId, Log,
get_block, get_block_number, get_deposit_logs_in_range, get_network_id, BlockQuery,
Eth1NetworkId, Log,
},
inner::{DepositUpdater, Inner},
DepositLog,
};
use futures::{future::TryFutureExt, stream, stream::TryStreamExt, StreamExt};
use parking_lot::{RwLock, RwLockReadGuard};
@@ -148,6 +148,7 @@ impl Service {
deposit_cache: RwLock::new(DepositUpdater::new(
config.deposit_contract_deploy_block,
)),
remote_head_block: RwLock::new(None),
config: RwLock::new(config),
spec,
}),
@@ -206,6 +207,21 @@ impl Service {
self.inner.block_cache.read().latest_block_timestamp()
}
/// Returns the latest head block returned from an Eth1 node.
///
/// ## Note
///
/// This is the simply the head of the Eth1 chain, with no regard to follow distance or the
/// voting period start.
pub fn head_block(&self) -> Option<Eth1Block> {
self.inner.remote_head_block.read().as_ref().cloned()
}
/// Returns the latest cached block.
pub fn latest_cached_block(&self) -> Option<Eth1Block> {
self.inner.block_cache.read().latest_block().cloned()
}
/// Returns the lowest block number stored.
pub fn lowest_block_number(&self) -> Option<u64> {
self.inner.block_cache.read().lowest_block_number()
@@ -301,9 +317,16 @@ impl Service {
pub async fn update(
&self,
) -> Result<(DepositCacheUpdateOutcome, BlockCacheUpdateOutcome), String> {
let remote_head_block = download_eth1_block(self.inner.clone(), None)
.map_err(|e| format!("Failed to update Eth1 service: {:?}", e))
.await?;
let remote_head_block_number = Some(remote_head_block.number);
*self.inner.remote_head_block.write() = Some(remote_head_block);
let update_deposit_cache = async {
let outcome = self
.update_deposit_cache()
.update_deposit_cache(remote_head_block_number)
.await
.map_err(|e| format!("Failed to update eth1 cache: {:?}", e))?;
@@ -314,12 +337,12 @@ impl Service {
"logs_imported" => outcome.logs_imported,
"last_processed_eth1_block" => self.inner.deposit_cache.read().last_processed_block,
);
Ok(outcome)
Ok::<_, String>(outcome)
};
let update_block_cache = async {
let outcome = self
.update_block_cache()
.update_block_cache(remote_head_block_number)
.await
.map_err(|e| format!("Failed to update eth1 cache: {:?}", e))?;
@@ -330,10 +353,13 @@ impl Service {
"blocks_imported" => outcome.blocks_imported,
"head_block" => outcome.head_block_number,
);
Ok(outcome)
Ok::<_, String>(outcome)
};
futures::try_join!(update_deposit_cache, update_block_cache)
let (deposit_outcome, block_outcome) =
futures::try_join!(update_deposit_cache, update_block_cache)?;
Ok((deposit_outcome, block_outcome))
}
/// A looping future that updates the cache, then waits `config.auto_update_interval` before
@@ -413,13 +439,19 @@ impl Service {
/// Will process no more than `BLOCKS_PER_LOG_QUERY * MAX_LOG_REQUESTS_PER_UPDATE` blocks in a
/// single update.
///
/// If `remote_highest_block_opt` is `Some`, use that value instead of querying `self.endpoint`
/// for the head of the eth1 chain.
///
/// ## Resolves with
///
/// - Ok(_) if the update was successful (the cache may or may not have been modified).
/// - Err(_) if there is an error.
///
/// Emits logs for debugging and errors.
pub async fn update_deposit_cache(&self) -> Result<DepositCacheUpdateOutcome, Error> {
pub async fn update_deposit_cache(
&self,
remote_highest_block_opt: Option<u64>,
) -> Result<DepositCacheUpdateOutcome, Error> {
let endpoint = self.config().endpoint.clone();
let follow_distance = self.config().follow_distance;
let deposit_contract_address = self.config().deposit_contract_address.clone();
@@ -437,7 +469,13 @@ impl Service {
.map(|n| n + 1)
.unwrap_or_else(|| self.config().deposit_contract_deploy_block);
let range = get_new_block_numbers(&endpoint, next_required_block, follow_distance).await?;
let range = get_new_block_numbers(
&endpoint,
remote_highest_block_opt,
next_required_block,
follow_distance,
)
.await?;
let block_number_chunks = if let Some(range) = range {
range
@@ -483,7 +521,7 @@ impl Service {
log_chunk
.iter()
.map(|raw_log| {
DepositLog::from_log(&raw_log, self.inner.spec()).map_err(|error| {
raw_log.to_deposit_log(self.inner.spec()).map_err(|error| {
Error::FailedToParseDepositLog {
block_range: block_range.clone(),
error,
@@ -548,13 +586,19 @@ impl Service {
///
/// If configured, prunes the block cache after importing new blocks.
///
/// If `remote_highest_block_opt` is `Some`, use that value instead of querying `self.endpoint`
/// for the head of the eth1 chain.
///
/// ## Resolves with
///
/// - Ok(_) if the update was successful (the cache may or may not have been modified).
/// - Err(_) if there is an error.
///
/// Emits logs for debugging and errors.
pub async fn update_block_cache(&self) -> Result<BlockCacheUpdateOutcome, Error> {
pub async fn update_block_cache(
&self,
remote_highest_block_opt: Option<u64>,
) -> Result<BlockCacheUpdateOutcome, Error> {
let block_cache_truncation = self.config().block_cache_truncation;
let max_blocks_per_update = self
.config()
@@ -572,7 +616,13 @@ impl Service {
let endpoint = self.config().endpoint.clone();
let follow_distance = self.config().follow_distance;
let range = get_new_block_numbers(&endpoint, next_required_block, follow_distance).await?;
let range = get_new_block_numbers(
&endpoint,
remote_highest_block_opt,
next_required_block,
follow_distance,
)
.await?;
// Map the range of required blocks into a Vec.
//
// If the required range is larger than the size of the cache, drop the exiting cache
@@ -623,7 +673,7 @@ impl Service {
|mut block_numbers| async {
match block_numbers.next() {
Some(block_number) => {
match download_eth1_block(self.inner.clone(), block_number).await {
match download_eth1_block(self.inner.clone(), Some(block_number)).await {
Ok(eth1_block) => Ok(Some((eth1_block, block_numbers))),
Err(e) => Err(e),
}
@@ -708,13 +758,17 @@ impl Service {
/// the locally stored best block.
async fn get_new_block_numbers<'a>(
endpoint: &str,
remote_highest_block_opt: Option<u64>,
next_required_block: u64,
follow_distance: u64,
) -> Result<Option<RangeInclusive<u64>>, Error> {
let remote_highest_block =
let remote_highest_block = if let Some(block_number) = remote_highest_block_opt {
block_number
} else {
get_block_number(endpoint, Duration::from_millis(BLOCK_NUMBER_TIMEOUT_MILLIS))
.map_err(Error::GetBlockNumberFailed)
.await?;
.await?
};
let remote_follow_block = remote_highest_block.saturating_sub(follow_distance);
if next_required_block <= remote_follow_block {
@@ -739,26 +793,37 @@ async fn get_new_block_numbers<'a>(
/// Downloads the `(block, deposit_root, deposit_count)` tuple from an eth1 node for the given
/// `block_number`.
///
/// Set `block_number_opt = None` to get the "latest" eth1 block (i.e., the head).
///
/// Performs three async calls to an Eth1 HTTP JSON RPC endpoint.
async fn download_eth1_block(cache: Arc<Inner>, block_number: u64) -> Result<Eth1Block, Error> {
async fn download_eth1_block(
cache: Arc<Inner>,
block_number_opt: Option<u64>,
) -> Result<Eth1Block, Error> {
let endpoint = cache.config.read().endpoint.clone();
let deposit_root = cache
.deposit_cache
.read()
.cache
.get_deposit_root_from_cache(block_number);
let deposit_root = block_number_opt.and_then(|block_number| {
cache
.deposit_cache
.read()
.cache
.get_deposit_root_from_cache(block_number)
});
let deposit_count = cache
.deposit_cache
.read()
.cache
.get_deposit_count_from_cache(block_number);
let deposit_count = block_number_opt.and_then(|block_number| {
cache
.deposit_cache
.read()
.cache
.get_deposit_count_from_cache(block_number)
});
// Performs a `get_blockByNumber` call to an eth1 node.
let http_block = get_block(
&endpoint,
block_number,
block_number_opt
.map(BlockQuery::Number)
.unwrap_or_else(|| BlockQuery::Latest),
Duration::from_millis(GET_BLOCK_TIMEOUT_MILLIS),
)
.map_err(Error::BlockDownloadFailed)