Make eth1 caching work with fast synced node (#709)

* Add functions to get deposit_count and deposit_root from deposit cache

* Fetch deposit root and deposit count from cache

* Fix bugs

* Add test

* Compare deposit_count between the caching and http eth1 blocks

* Revert "Compare deposit_count between the caching and http eth1 blocks"

This reverts commit e3d0325ae6.

* Fetch deposit cache using binary search instead of linear search

* BlockCache waits till DepositCache is in sync

* Truncate required_blocks in block_cache upto latest_processed_block in deposit cache

* Clean up

* Handled getting deposit count before deploying deposit contract

* More cleanup

* Remove calls to http get deposit/count

* Fix block cache tests

* Minor changes

* Fix bootnode ports

* Address some of Paul's comments

* Optimize `get_deposit_root` by caching `DepositDataTree`

* Fix comments and minor changes

* Change eth1 default config parameters

* Use `Vec` instead of `HashMap` to store `deposit_roots`

* Minor renaming
This commit is contained in:
Pawan Dhananjay
2019-12-20 04:07:39 +05:30
committed by Paul Hauner
parent 251aea645c
commit 74b327b50d
4 changed files with 236 additions and 50 deletions

View File

@@ -2,9 +2,7 @@ use crate::metrics;
use crate::{
block_cache::{BlockCache, Error as BlockCacheError, Eth1Block},
deposit_cache::Error as DepositCacheError,
http::{
get_block, get_block_number, get_deposit_count, get_deposit_logs_in_range, get_deposit_root,
},
http::{get_block, get_block_number, get_deposit_logs_in_range},
inner::{DepositUpdater, Inner},
DepositLog,
};
@@ -27,14 +25,10 @@ const STANDARD_TIMEOUT_MILLIS: u64 = 15_000;
const BLOCK_NUMBER_TIMEOUT_MILLIS: u64 = STANDARD_TIMEOUT_MILLIS;
/// Timeout when doing an eth_getBlockByNumber call.
const GET_BLOCK_TIMEOUT_MILLIS: u64 = STANDARD_TIMEOUT_MILLIS;
/// Timeout when doing an eth_call to read the deposit contract root.
const GET_DEPOSIT_ROOT_TIMEOUT_MILLIS: u64 = STANDARD_TIMEOUT_MILLIS;
/// Timeout when doing an eth_call to read the deposit contract deposit count.
const GET_DEPOSIT_COUNT_TIMEOUT_MILLIS: u64 = STANDARD_TIMEOUT_MILLIS;
/// Timeout when doing an eth_getLogs to read the deposit contract logs.
const GET_DEPOSIT_LOG_TIMEOUT_MILLIS: u64 = STANDARD_TIMEOUT_MILLIS;
#[derive(Debug, PartialEq, Clone)]
#[derive(Debug, PartialEq)]
pub enum Error {
/// The remote node is less synced that we expect, it is not useful until has done more
/// syncing.
@@ -118,8 +112,8 @@ impl Default for Config {
Self {
endpoint: "http://localhost:8545".into(),
deposit_contract_address: "0x0000000000000000000000000000000000000000".into(),
deposit_contract_deploy_block: 0,
lowest_cached_block_number: 0,
deposit_contract_deploy_block: 1,
lowest_cached_block_number: 1,
follow_distance: 128,
block_cache_truncation: Some(4_096),
auto_update_interval_millis: 7_000,
@@ -147,6 +141,9 @@ impl Service {
pub fn new(config: Config, log: Logger) -> Self {
Self {
inner: Arc::new(Inner {
deposit_cache: RwLock::new(DepositUpdater::new(
config.deposit_contract_deploy_block,
)),
config: RwLock::new(config),
..Inner::default()
}),
@@ -254,6 +251,7 @@ impl Service {
"Updated eth1 deposit cache";
"cached_deposits" => inner_1.deposit_cache.read().cache.len(),
"logs_imported" => logs_imported,
"last_processed_eth1_block" => inner_1.deposit_cache.read().last_processed_block,
),
Err(e) => error!(
log_a,
@@ -491,6 +489,7 @@ impl Service {
let cache_3 = self.inner.clone();
let cache_4 = self.inner.clone();
let cache_5 = self.inner.clone();
let cache_6 = self.inner.clone();
let block_cache_truncation = self.config().block_cache_truncation;
let max_blocks_per_update = self
@@ -527,7 +526,6 @@ impl Service {
let max_size = block_cache_truncation
.map(|n| n as u64)
.unwrap_or_else(u64::max_value);
if range_size > max_size {
// If the range of required blocks is larger than `max_size`, drop all
// existing blocks and download `max_size` count of blocks.
@@ -543,14 +541,22 @@ impl Service {
})
// Download the range of blocks and sequentially import them into the cache.
.and_then(move |required_block_numbers| {
// Last processed block in deposit cache
let latest_in_cache = cache_6
.deposit_cache
.read()
.last_processed_block
.unwrap_or(0);
let required_block_numbers = required_block_numbers
.into_iter()
.take(max_blocks_per_update);
.filter(|x| *x <= latest_in_cache)
.take(max_blocks_per_update)
.collect::<Vec<_>>();
// Produce a stream from the list of required block numbers and return a future that
// consumes the it.
stream::unfold(
required_block_numbers,
required_block_numbers.into_iter(),
move |mut block_numbers| match block_numbers.next() {
Some(block_number) => Some(
download_eth1_block(cache_2.clone(), block_number)
@@ -639,6 +645,16 @@ fn download_eth1_block<'a>(
cache: Arc<Inner>,
block_number: u64,
) -> impl Future<Item = Eth1Block, Error = Error> + 'a {
let deposit_root = cache
.deposit_cache
.read()
.cache
.get_deposit_root_from_cache(block_number);
let deposit_count = cache
.deposit_cache
.read()
.cache
.get_deposit_count_from_cache(block_number);
// Performs a `get_blockByNumber` call to an eth1 node.
get_block(
&cache.config.read().endpoint,
@@ -646,24 +662,7 @@ fn download_eth1_block<'a>(
Duration::from_millis(GET_BLOCK_TIMEOUT_MILLIS),
)
.map_err(Error::BlockDownloadFailed)
.join3(
// Perform 2x `eth_call` via an eth1 node to read the deposit contract root and count.
get_deposit_root(
&cache.config.read().endpoint,
&cache.config.read().deposit_contract_address,
block_number,
Duration::from_millis(GET_DEPOSIT_ROOT_TIMEOUT_MILLIS),
)
.map_err(Error::GetDepositRootFailed),
get_deposit_count(
&cache.config.read().endpoint,
&cache.config.read().deposit_contract_address,
block_number,
Duration::from_millis(GET_DEPOSIT_COUNT_TIMEOUT_MILLIS),
)
.map_err(Error::GetDepositCountFailed),
)
.map(|(http_block, deposit_root, deposit_count)| Eth1Block {
.map(move |http_block| Eth1Block {
hash: http_block.hash,
number: http_block.number,
timestamp: http_block.timestamp,