Files
lighthouse/beacon_node/eth1/src/service.rs
Paul Hauner f229bbba1c Eth1 Integration (#542)
* Refactor to cache Eth1Data

* Fix merge conflicts and minor refactorings

* Rename Eth1Cache to Eth1DataCache

* Refactor events subscription

* Add deposits module to interface with BeaconChain deposits

* Remove utils

* Rename to types.rs and add trait constraints to Eth1DataFetcher

* Confirm to trait constraints. Make Web3DataFetcher cloneable

* Make fetcher object member of deposit and eth1_data cache and other fixes
* Fix update_cache function
* Move fetch_eth1_data to impl block
* Fix deposit tests

* Create Eth1 object for interfacing with Beacon chain
* Add `run` function for running update_cache and subscribe_deposit_logs tasks
* Add logging

* Run `cargo fmt` and make tests pass

* Convert sync functions to async

* Add timeouts to web3 functions

* Return futures from cache functions

* Add failed chaining of futures

* Working cache updation

* Clean up tests and `update_cache` function

* Refactor `get_eth1_data` functions to work with future returning functions

* Refactor eth1 `run` function to work with modified `update_cache` api

* Minor changes

* Add distance parameter to `update_cache`

* Fix tests and other minor fixes

* Working integration with cache and deposits

* Add merkle_tree construction, proof generation and verification code

* Add function to construct and fetch Deposits for BeaconNode

* Add error handling

* Import ssz

* Add error handling to eth1 cache and fix minor errors

* Run rustfmt

* Fix minor bug

* Rename Eth1Error and change to Result<T>

* Change deposit fetching mechanism from notification based to poll based
* Add deposits from eth1 chain in a given range every `x` blocks
* Modify `run` function to accommodate changes
* Minor fixes

* Fix formatting

* Initial commit. web3 api working.

* Tidied up lib. Add function for fetching logs.

* Refactor with `Eth1DataFetcher` trait

* Add parsing for deposit contract logs and get_eth1_data function

* Add `get_eth1_votes` function

* Refactor to cache Eth1Data

* Fix merge conflicts and minor refactorings

* Rename Eth1Cache to Eth1DataCache

* Refactor events subscription

* Add deposits module to interface with BeaconChain deposits

* Remove utils

* Rename to types.rs and add trait constraints to Eth1DataFetcher

* Confirm to trait constraints. Make Web3DataFetcher cloneable

* Make fetcher object member of deposit and eth1_data cache and other fixes
* Fix update_cache function
* Move fetch_eth1_data to impl block
* Fix deposit tests

* Create Eth1 object for interfacing with Beacon chain
* Add `run` function for running update_cache and subscribe_deposit_logs tasks
* Add logging

* Run `cargo fmt` and make tests pass

* Convert sync functions to async

* Add timeouts to web3 functions

* Return futures from cache functions

* Add failed chaining of futures

* Working cache updation

* Clean up tests and `update_cache` function

* Refactor `get_eth1_data` functions to work with future returning functions

* Refactor eth1 `run` function to work with modified `update_cache` api

* Minor changes

* Add distance parameter to `update_cache`

* Fix tests and other minor fixes

* Working integration with cache and deposits

* Add merkle_tree construction, proof generation and verification code

* Add function to construct and fetch Deposits for BeaconNode

* Add error handling

* Import ssz

* Add error handling to eth1 cache and fix minor errors

* Run rustfmt

* Fix minor bug

* Rename Eth1Error and change to Result<T>

* Change deposit fetching mechanism from notification based to poll based
* Add deposits from eth1 chain in a given range every `x` blocks
* Modify `run` function to accommodate changes
* Minor fixes

* Fix formatting

* Fix merge issue

* Refactor with `Config` struct. Remote `ContractConfig`

* Rename eth1_chain crate to eth1

* Rename files and read abi file using `fs::read`

* Move eth1 to lib

* Remove unnecessary mutability constraint

* Add `Web3Backend` for returning actual eth1 data

* Refactor `get_eth1_votes` to return a Result

* Delete `eth1_chain` crate

* Return `Result` from `get_deposits`

* Fix range of deposits to return to beacon chain

* Add `get_block_height_by_hash` trait function

* Add naive method for getting `previous_eth1_distance`

* Add eth1 config params to main config

* Add instructions for setting up eth1 testing environment

* Add build script to fetch deposit contract abi

* Contract ABI is part of compiled binary

* Fix minor bugs

* Move docs to lib

* Add timeout to config

* Remove print statements

* Change warn to error

* Fix typos

* Removed prints in test and get timeout value from config

* Fixed error types

* Added logging to web3_fetcher

* Refactor for modified web3 api

* Fix minor stuff

* Add build script

* Tidy, hide eth1 integration tests behind flag

* Add http crate

* Add first stages of eth1_test_rig

* Fix deposits on test rig

* Fix bug with deposit count method

* Add block hash getter to http eth1

* Clean eth1 http crate and tests

* Add script to start ganache

* Adds deposit tree to eth1-http

* Extend deposit tree tests

* Tidy tests in eth1-http

* Add more detail to get block request

* Add block cache to eth1-http

* Rename deposit tree to deposit cache

* Add inital updating to eth1-http

* Tidy updater

* Fix compile bugs in tests

* Adds an Eth1DataCache builder

* Reorg eth1-http files

* Add (failing) tests for eth1 updater

* Rename files, fix bug in eth1-http

* Ensure that ganache timestamps are increasing

* Fix bugs with getting eth1data ancestors

* Improve eth1 testing, fix bugs

* Add truncate method to block cache

* Add pruning to block cache update process

* Add tests for block pruning

* Allow for dropping an expired cache.

* Add more comments

* Add first compiling version of deposit updater

* Add common fn for getting range of required blocks

* Add passing deposit update test

* Improve tests

* Fix block pruning bug

* Add tests for running two updates at once

* Add updater services to eth1

* Add deposit collection to beacon chain

* Add incomplete builder experiments

* Add first working version of beacon chain builder

* Update test harness to new beacon chain type

* Rename builder file, tidy

* Add first working client builder

* Progress further on client builder

* Update becaon node binary to use client builder

* Ensure release tests compile

* Remove old eth1 crate

* Add first pass of new lighthouse binary

* Fix websocket server startup

* Remove old binary code from beacon_node crate

* Add first working beacon node tests

* Add genesis crate, new eth1 cache_2

* Add Serivce to Eth1Cache

* Refactor with general eth1 improvements

* Add passing genesis test

* Tidy, add comments

* Add more comments to eth1 service

* Add further eth1 progress

* Fix some bugs with genesis

* Fix eth1 bugs, make eth1 linking more efficient

* Shift logic in genesis service

* Add more comments to genesis service

* Add gzip, max request values, timeouts to http

* Update testnet parameters to suit goerli testnet

* Add ability to vary Fork, fix custom spec

* Be more explicit about deposit fork version

* Start adding beacon chain eth1 option

* Add more flexibility to prod client

* Further runtime refactoring

* Allow for starting from store

* Add bootstrapping to client config

* Add remote_beacon_node crate

* Update eth1 service for more configurability

* Update eth1 tests to use less runtimes

* Patch issues with tests using too many files

* Move dummy eth1 backend flag

* Ensure all tests pass

* Add ganache-cli to Dockerfile

* Use a special docker hub image for testing

* Appease clippy

* Move validator client into lighthouse binary

* Allow starting with dummy eth1 backend

* Improve logging

* Fix dummy eth1 backend from cli

* Add extra testnet command

* Ensure consistent spec in beacon node

* Update eth1 rig to work on goerli

* Tidy lcli, start adding support for yaml config

* Add incomplete YamlConfig struct

* Remove efforts at YamlConfig

* Add incomplete eth1 voting. Blocked on spec issues

* Add (untested) first pass at eth1 vote algo

* Add tests for winning vote

* Add more tests for eth1 chain

* Add more eth1 voting tests

* Added more eth1 voting testing

* Change test name

* Add more tests to eth1 chain

* Tidy eth1 generics, add more tests

* Improve comments

* Tidy beacon_node tests

* Tidy, rename JsonRpc.. to Caching..

* Tidy voting logic

* Tidy builder docs

* Add comments, tidy eth1

* Add more comments to eth1

* Fix bug with winning_vote

* Add doc comments to the `ClientBuilder`

* Remove commented-out code

* Improve `ClientBuilder` docs

* Add comments to client config

* Add decoding test for `ClientConfig`

* Remove unused `DepositSet` struct

* Tidy `block_cache`

* Remove commented out lines

* Remove unused code in `eth1` crate

* Remove old validator binary `main.rs`

* Tidy, fix tests compile error

* Add initial tests for get_deposits

* Remove dead code in eth1_test_rig

* Update TestingDepositBuilder

* Add testing for getting eth1 deposits

* Fix duplicate rand dep

* Remove dead code

* Remove accidentally-added files

* Fix comment in eth1_genesis_service

* Add .gitignore for eth1_test_rig

* Fix bug in eth1_genesis_service

* Remove dead code from eth2_config

* Fix tabs/spaces in root Cargo.toml

* Tidy eth1 crate

* Allow for re-use of eth1 service after genesis

* Update docs for new CLI

* Change README gif

* Tidy eth1 http module

* Tidy eth1 service

* Tidy environment crate

* Remove unused file

* Tidy, add comments

* Remove commented-out code

* Address majority of Michael's comments

* Address other PR comments

* Add link to issue alongside TODO
2019-11-15 14:47:51 +11:00

644 lines
24 KiB
Rust

use crate::{
block_cache::{BlockCache, Error as BlockCacheError, Eth1Block},
deposit_cache::Error as DepositCacheError,
http::{
get_block, get_block_number, get_deposit_count, get_deposit_logs_in_range, get_deposit_root,
},
inner::{DepositUpdater, Inner},
DepositLog,
};
use exit_future::Exit;
use futures::{
future::{loop_fn, Loop},
stream, Future, Stream,
};
use parking_lot::{RwLock, RwLockReadGuard};
use serde::{Deserialize, Serialize};
use slog::{debug, error, trace, Logger};
use std::ops::{Range, RangeInclusive};
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::timer::Delay;
const STANDARD_TIMEOUT_MILLIS: u64 = 15_000;
/// Timeout when doing a eth_blockNumber call.
const BLOCK_NUMBER_TIMEOUT_MILLIS: u64 = STANDARD_TIMEOUT_MILLIS;
/// Timeout when doing an eth_getBlockByNumber call.
const GET_BLOCK_TIMEOUT_MILLIS: u64 = STANDARD_TIMEOUT_MILLIS;
/// Timeout when doing an eth_call to read the deposit contract root.
const GET_DEPOSIT_ROOT_TIMEOUT_MILLIS: u64 = STANDARD_TIMEOUT_MILLIS;
/// Timeout when doing an eth_call to read the deposit contract deposit count.
const GET_DEPOSIT_COUNT_TIMEOUT_MILLIS: u64 = STANDARD_TIMEOUT_MILLIS;
/// Timeout when doing an eth_getLogs to read the deposit contract logs.
const GET_DEPOSIT_LOG_TIMEOUT_MILLIS: u64 = STANDARD_TIMEOUT_MILLIS;
#[derive(Debug, PartialEq, Clone)]
pub enum Error {
/// The remote node is less synced that we expect, it is not useful until has done more
/// syncing.
RemoteNotSynced {
next_required_block: u64,
remote_highest_block: u64,
follow_distance: u64,
},
/// Failed to download a block from the eth1 node.
BlockDownloadFailed(String),
/// Failed to get the current block number from the eth1 node.
GetBlockNumberFailed(String),
/// Failed to read the deposit contract root from the eth1 node.
GetDepositRootFailed(String),
/// Failed to read the deposit contract deposit count from the eth1 node.
GetDepositCountFailed(String),
/// Failed to read the deposit contract root from the eth1 node.
GetDepositLogsFailed(String),
/// There was an inconsistency when adding a block to the cache.
FailedToInsertEth1Block(BlockCacheError),
/// There was an inconsistency when adding a deposit to the cache.
FailedToInsertDeposit(DepositCacheError),
/// A log downloaded from the eth1 contract was not well formed.
FailedToParseDepositLog {
block_range: Range<u64>,
error: String,
},
/// There was an unexpected internal error.
Internal(String),
}
/// The success message for an Eth1Data cache update.
#[derive(Debug, PartialEq, Clone)]
pub enum BlockCacheUpdateOutcome {
/// The cache was sucessfully updated.
Success {
blocks_imported: usize,
head_block_number: Option<u64>,
},
}
/// The success message for an Eth1 deposit cache update.
#[derive(Debug, PartialEq, Clone)]
pub enum DepositCacheUpdateOutcome {
/// The cache was sucessfully updated.
Success { logs_imported: usize },
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Config {
/// An Eth1 node (e.g., Geth) running a HTTP JSON-RPC endpoint.
pub endpoint: String,
/// The address the `BlockCache` and `DepositCache` should assume is the canonical deposit contract.
pub deposit_contract_address: String,
/// Defines the first block that the `DepositCache` will start searching for deposit logs.
///
/// Setting too high can result in missed logs. Setting too low will result in unnecessary
/// calls to the Eth1 node's HTTP JSON RPC.
pub deposit_contract_deploy_block: u64,
/// Defines the lowest block number that should be downloaded and added to the `BlockCache`.
pub lowest_cached_block_number: u64,
/// Defines how far behind the Eth1 node's head we should follow.
///
/// Note: this should be less than or equal to the specification's `ETH1_FOLLOW_DISTANCE`.
pub follow_distance: u64,
/// Defines the number of blocks that should be retained each time the `BlockCache` calls truncate on
/// itself.
pub block_cache_truncation: Option<usize>,
/// The interval between updates when using the `auto_update` function.
pub auto_update_interval_millis: u64,
/// The span of blocks we should query for logs, per request.
pub blocks_per_log_query: usize,
/// The maximum number of log requests per update.
pub max_log_requests_per_update: Option<usize>,
/// The maximum number of log requests per update.
pub max_blocks_per_update: Option<usize>,
}
impl Default for Config {
fn default() -> Self {
Self {
endpoint: "http://localhost:8545".into(),
deposit_contract_address: "0x0000000000000000000000000000000000000000".into(),
deposit_contract_deploy_block: 0,
lowest_cached_block_number: 0,
follow_distance: 128,
block_cache_truncation: Some(4_096),
auto_update_interval_millis: 500,
blocks_per_log_query: 1_000,
max_log_requests_per_update: None,
max_blocks_per_update: None,
}
}
}
/// Provides a set of Eth1 caches and async functions to update them.
///
/// Stores the following caches:
///
/// - Deposit cache: stores all deposit logs from the deposit contract.
/// - Block cache: stores some number of eth1 blocks.
#[derive(Clone)]
pub struct Service {
inner: Arc<Inner>,
pub log: Logger,
}
impl Service {
/// Creates a new service. Does not attempt to connect to the eth1 node.
pub fn new(config: Config, log: Logger) -> Self {
Self {
inner: Arc::new(Inner {
config: RwLock::new(config),
..Inner::default()
}),
log,
}
}
/// Provides access to the block cache.
pub fn blocks(&self) -> &RwLock<BlockCache> {
&self.inner.block_cache
}
/// Provides access to the deposit cache.
pub fn deposits(&self) -> &RwLock<DepositUpdater> {
&self.inner.deposit_cache
}
/// Returns the number of currently cached blocks.
pub fn block_cache_len(&self) -> usize {
self.blocks().read().len()
}
/// Returns the number deposits available in the deposit cache.
pub fn deposit_cache_len(&self) -> usize {
self.deposits().read().cache.len()
}
/// Read the service's configuration.
pub fn config(&self) -> RwLockReadGuard<Config> {
self.inner.config.read()
}
/// Updates the configuration in `self to be `new_config`.
///
/// Will truncate the block cache if the new configure specifies truncation.
pub fn update_config(&self, new_config: Config) -> Result<(), String> {
let mut old_config = self.inner.config.write();
if new_config.deposit_contract_deploy_block != old_config.deposit_contract_deploy_block {
// This may be possible, I just haven't looked into the details to ensure it's safe.
Err("Updating deposit_contract_deploy_block is not supported".to_string())
} else {
*old_config = new_config;
// Prevents a locking condition when calling prune_blocks.
drop(old_config);
self.inner.prune_blocks();
Ok(())
}
}
/// Set the lowest block that the block cache will store.
///
/// Note: this block may not always be present if truncating is enabled.
pub fn set_lowest_cached_block(&self, block_number: u64) {
self.inner.config.write().lowest_cached_block_number = block_number;
}
/// Update the deposit and block cache, returning an error if either fail.
///
/// ## Returns
///
/// - Ok(_) if the update was successful (the cache may or may not have been modified).
/// - Err(_) if there is an error.
///
/// Emits logs for debugging and errors.
pub fn update(
&self,
) -> impl Future<Item = (DepositCacheUpdateOutcome, BlockCacheUpdateOutcome), Error = String>
{
let log_a = self.log.clone();
let log_b = self.log.clone();
let deposit_future = self
.update_deposit_cache()
.map_err(|e| format!("Failed to update eth1 cache: {:?}", e))
.then(move |result| {
match &result {
Ok(DepositCacheUpdateOutcome::Success { logs_imported }) => trace!(
log_a,
"Updated eth1 deposit cache";
"logs_imported" => logs_imported,
),
Err(e) => error!(
log_a,
"Failed to update eth1 deposit cache";
"error" => e
),
};
result
});
let block_future = self
.update_block_cache()
.map_err(|e| format!("Failed to update eth1 cache: {:?}", e))
.then(move |result| {
match &result {
Ok(BlockCacheUpdateOutcome::Success {
blocks_imported,
head_block_number,
}) => trace!(
log_b,
"Updated eth1 block cache";
"blocks_imported" => blocks_imported,
"head_block" => head_block_number,
),
Err(e) => error!(
log_b,
"Failed to update eth1 block cache";
"error" => e
),
};
result
});
deposit_future.join(block_future)
}
/// A looping future that updates the cache, then waits `config.auto_update_interval` before
/// updating it again.
///
/// ## Returns
///
/// - Ok(_) if the update was successful (the cache may or may not have been modified).
/// - Err(_) if there is an error.
///
/// Emits logs for debugging and errors.
pub fn auto_update(&self, exit: Exit) -> impl Future<Item = (), Error = ()> {
let service = self.clone();
let log = self.log.clone();
let update_interval = Duration::from_millis(self.config().auto_update_interval_millis);
loop_fn((), move |()| {
let exit = exit.clone();
let service = service.clone();
let log_a = log.clone();
let log_b = log.clone();
service
.update()
.then(move |update_result| {
match update_result {
Err(e) => error!(
log_a,
"Failed to update eth1 genesis cache";
"retry_millis" => update_interval.as_millis(),
"error" => e,
),
Ok((deposit, block)) => debug!(
log_a,
"Updated eth1 genesis cache";
"retry_millis" => update_interval.as_millis(),
"blocks" => format!("{:?}", block),
"deposits" => format!("{:?}", deposit),
),
};
// Do not break the loop if there is an update failure.
Ok(())
})
.and_then(move |_| Delay::new(Instant::now() + update_interval))
.then(move |timer_result| {
if let Err(e) = timer_result {
error!(
log_b,
"Failed to trigger eth1 cache update delay";
"error" => format!("{:?}", e),
);
}
// Do not break the loop if there is an timer failure.
Ok(())
})
.map(move |_| {
if exit.is_live() {
Loop::Continue(())
} else {
Loop::Break(())
}
})
})
}
/// Contacts the remote eth1 node and attempts to import deposit logs up to the configured
/// follow-distance block.
///
/// Will process no more than `BLOCKS_PER_LOG_QUERY * MAX_LOG_REQUESTS_PER_UPDATE` blocks in a
/// single update.
///
/// ## Resolves with
///
/// - Ok(_) if the update was successful (the cache may or may not have been modified).
/// - Err(_) if there is an error.
///
/// Emits logs for debugging and errors.
pub fn update_deposit_cache(
&self,
) -> impl Future<Item = DepositCacheUpdateOutcome, Error = Error> {
let service_1 = self.clone();
let service_2 = self.clone();
let blocks_per_log_query = self.config().blocks_per_log_query;
let max_log_requests_per_update = self
.config()
.max_log_requests_per_update
.unwrap_or_else(usize::max_value);
let next_required_block = self
.deposits()
.read()
.last_processed_block
.map(|n| n + 1)
.unwrap_or_else(|| self.config().deposit_contract_deploy_block);
get_new_block_numbers(
&self.config().endpoint,
next_required_block,
self.config().follow_distance,
)
.map(move |range| {
range
.map(|range| {
range
.collect::<Vec<u64>>()
.chunks(blocks_per_log_query)
.take(max_log_requests_per_update)
.map(|vec| {
let first = vec.first().cloned().unwrap_or_else(|| 0);
let last = vec.last().map(|n| n + 1).unwrap_or_else(|| 0);
(first..last)
})
.collect::<Vec<Range<u64>>>()
})
.unwrap_or_else(|| vec![])
})
.and_then(move |block_number_chunks| {
stream::unfold(
block_number_chunks.into_iter(),
move |mut chunks| match chunks.next() {
Some(chunk) => {
let chunk_1 = chunk.clone();
Some(
get_deposit_logs_in_range(
&service_1.config().endpoint,
&service_1.config().deposit_contract_address,
chunk,
Duration::from_millis(GET_DEPOSIT_LOG_TIMEOUT_MILLIS),
)
.map_err(Error::GetDepositLogsFailed)
.map(|logs| (chunk_1, logs))
.map(|logs| (logs, chunks)),
)
}
None => None,
},
)
.fold(0, move |mut sum, (block_range, log_chunk)| {
let mut cache = service_2.deposits().write();
log_chunk
.into_iter()
.map(|raw_log| {
DepositLog::from_log(&raw_log).map_err(|error| {
Error::FailedToParseDepositLog {
block_range: block_range.clone(),
error,
}
})
})
// Return early if any of the logs cannot be parsed.
//
// This costs an additional `collect`, however it enforces that no logs are
// imported if any one of them cannot be parsed.
.collect::<Result<Vec<_>, _>>()?
.into_iter()
.map(|deposit_log| {
cache
.cache
.insert_log(deposit_log)
.map_err(Error::FailedToInsertDeposit)?;
sum += 1;
Ok(())
})
// Returns if a deposit is unable to be added to the cache.
//
// If this error occurs, the cache will no longer be guaranteed to hold either
// none or all of the logs for each block (i.e., they may exist _some_ logs for
// a block, but not _all_ logs for that block). This scenario can cause the
// node to choose an invalid genesis state or propose an invalid block.
.collect::<Result<_, _>>()?;
cache.last_processed_block = Some(block_range.end.saturating_sub(1));
Ok(sum)
})
.map(|logs_imported| DepositCacheUpdateOutcome::Success { logs_imported })
})
}
/// Contacts the remote eth1 node and attempts to import all blocks up to the configured
/// follow-distance block.
///
/// If configured, prunes the block cache after importing new blocks.
///
/// ## Resolves with
///
/// - Ok(_) if the update was successful (the cache may or may not have been modified).
/// - Err(_) if there is an error.
///
/// Emits logs for debugging and errors.
pub fn update_block_cache(&self) -> impl Future<Item = BlockCacheUpdateOutcome, Error = Error> {
let cache_1 = self.inner.clone();
let cache_2 = self.inner.clone();
let cache_3 = self.inner.clone();
let cache_4 = self.inner.clone();
let cache_5 = self.inner.clone();
let block_cache_truncation = self.config().block_cache_truncation;
let max_blocks_per_update = self
.config()
.max_blocks_per_update
.unwrap_or_else(usize::max_value);
let next_required_block = cache_1
.block_cache
.read()
.highest_block_number()
.map(|n| n + 1)
.unwrap_or_else(|| self.config().lowest_cached_block_number);
get_new_block_numbers(
&self.config().endpoint,
next_required_block,
self.config().follow_distance,
)
// Map the range of required blocks into a Vec.
//
// If the required range is larger than the size of the cache, drop the exiting cache
// because it's exipred and just download enough blocks to fill the cache.
.and_then(move |range| {
range
.map(|range| {
if range.start() > range.end() {
// Note: this check is not strictly necessary, however it remains to safe
// guard against any regression which may cause an underflow in a following
// subtraction operation.
Err(Error::Internal("Range was not increasing".into()))
} else {
let range_size = range.end() - range.start();
let max_size = block_cache_truncation
.map(|n| n as u64)
.unwrap_or_else(u64::max_value);
if range_size > max_size {
// If the range of required blocks is larger than `max_size`, drop all
// existing blocks and download `max_size` count of blocks.
let first_block = range.end() - max_size;
(*cache_5.block_cache.write()) = BlockCache::default();
Ok((first_block..=*range.end()).collect::<Vec<u64>>())
} else {
Ok(range.collect::<Vec<u64>>())
}
}
})
.unwrap_or_else(|| Ok(vec![]))
})
// Download the range of blocks and sequentially import them into the cache.
.and_then(move |required_block_numbers| {
let required_block_numbers = required_block_numbers
.into_iter()
.take(max_blocks_per_update);
// Produce a stream from the list of required block numbers and return a future that
// consumes the it.
stream::unfold(
required_block_numbers,
move |mut block_numbers| match block_numbers.next() {
Some(block_number) => Some(
download_eth1_block(cache_2.clone(), block_number)
.map(|v| (v, block_numbers)),
),
None => None,
},
)
.fold(0, move |sum, eth1_block| {
cache_3
.block_cache
.write()
.insert_root_or_child(eth1_block)
.map_err(Error::FailedToInsertEth1Block)?;
Ok(sum + 1)
})
})
.and_then(move |blocks_imported| {
// Prune the block cache, preventing it from growing too large.
cache_4.prune_blocks();
Ok(BlockCacheUpdateOutcome::Success {
blocks_imported,
head_block_number: cache_4.clone().block_cache.read().highest_block_number(),
})
})
}
}
/// Determine the range of blocks that need to be downloaded, given the remotes best block and
/// the locally stored best block.
fn get_new_block_numbers<'a>(
endpoint: &str,
next_required_block: u64,
follow_distance: u64,
) -> impl Future<Item = Option<RangeInclusive<u64>>, Error = Error> + 'a {
get_block_number(endpoint, Duration::from_millis(BLOCK_NUMBER_TIMEOUT_MILLIS))
.map_err(Error::GetBlockNumberFailed)
.and_then(move |remote_highest_block| {
let remote_follow_block = remote_highest_block.saturating_sub(follow_distance);
if next_required_block <= remote_follow_block {
Ok(Some(next_required_block..=remote_follow_block))
} else if next_required_block > remote_highest_block + 1 {
// If this is the case, the node must have gone "backwards" in terms of it's sync
// (i.e., it's head block is lower than it was before).
//
// We assume that the `follow_distance` should be sufficient to ensure this never
// happens, otherwise it is an error.
Err(Error::RemoteNotSynced {
next_required_block,
remote_highest_block,
follow_distance,
})
} else {
// Return an empty range.
Ok(None)
}
})
}
/// Downloads the `(block, deposit_root, deposit_count)` tuple from an eth1 node for the given
/// `block_number`.
///
/// Performs three async calls to an Eth1 HTTP JSON RPC endpoint.
fn download_eth1_block<'a>(
cache: Arc<Inner>,
block_number: u64,
) -> impl Future<Item = Eth1Block, Error = Error> + 'a {
// Performs a `get_blockByNumber` call to an eth1 node.
get_block(
&cache.config.read().endpoint,
block_number,
Duration::from_millis(GET_BLOCK_TIMEOUT_MILLIS),
)
.map_err(Error::BlockDownloadFailed)
.join3(
// Perform 2x `eth_call` via an eth1 node to read the deposit contract root and count.
get_deposit_root(
&cache.config.read().endpoint,
&cache.config.read().deposit_contract_address,
block_number,
Duration::from_millis(GET_DEPOSIT_ROOT_TIMEOUT_MILLIS),
)
.map_err(Error::GetDepositRootFailed),
get_deposit_count(
&cache.config.read().endpoint,
&cache.config.read().deposit_contract_address,
block_number,
Duration::from_millis(GET_DEPOSIT_COUNT_TIMEOUT_MILLIS),
)
.map_err(Error::GetDepositCountFailed),
)
.map(|(http_block, deposit_root, deposit_count)| Eth1Block {
hash: http_block.hash,
number: http_block.number,
timestamp: http_block.timestamp,
deposit_root,
deposit_count,
})
}
#[cfg(test)]
mod tests {
use super::*;
use toml;
#[test]
fn serde_serialize() {
let serialized =
toml::to_string(&Config::default()).expect("Should serde encode default config");
toml::from_str::<Config>(&serialized).expect("Should serde decode default config");
}
}