Port eth1 lib to use stable futures

This commit is contained in:
pawan
2020-02-29 18:49:51 +05:30
committed by Age Manning
parent 0b2b379f14
commit a984772475
4 changed files with 3140 additions and 2846 deletions

5214
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -11,8 +11,8 @@ toml = "^0.5"
web3 = "0.8.0" web3 = "0.8.0"
[dependencies] [dependencies]
reqwest = "0.9" reqwest = "0.10"
futures = "0.1.25" futures = "0.3"
serde_json = "1.0" serde_json = "1.0"
serde = { version = "1.0", features = ["derive"] } serde = { version = "1.0", features = ["derive"] }
hex = "0.3" hex = "0.3"
@@ -24,7 +24,7 @@ tree_hash = { path = "../../eth2/utils/tree_hash"}
eth2_hashing = { path = "../../eth2/utils/eth2_hashing"} eth2_hashing = { path = "../../eth2/utils/eth2_hashing"}
parking_lot = "0.7" parking_lot = "0.7"
slog = "^2.2.3" slog = "^2.2.3"
tokio = "0.1.22" tokio = { version = "0.2", features = ["full"] }
state_processing = { path = "../../eth2/state_processing" } state_processing = { path = "../../eth2/state_processing" }
libflate = "0.1" libflate = "0.1"
lighthouse_metrics = { path = "../../eth2/utils/lighthouse_metrics"} lighthouse_metrics = { path = "../../eth2/utils/lighthouse_metrics"}

View File

@@ -10,8 +10,8 @@
//! //!
//! There is no ABI parsing here, all function signatures and topics are hard-coded as constants. //! There is no ABI parsing here, all function signatures and topics are hard-coded as constants.
use futures::{Future, Stream}; use futures::future::TryFutureExt;
use reqwest::{header::CONTENT_TYPE, r#async::ClientBuilder, StatusCode}; use reqwest::{header::CONTENT_TYPE, ClientBuilder, StatusCode};
use serde_json::{json, Value}; use serde_json::{json, Value};
use std::ops::Range; use std::ops::Range;
use std::time::Duration; use std::time::Duration;
@@ -40,37 +40,31 @@ pub struct Block {
/// Returns the current block number. /// Returns the current block number.
/// ///
/// Uses HTTP JSON RPC at `endpoint`. E.g., `http://localhost:8545`. /// Uses HTTP JSON RPC at `endpoint`. E.g., `http://localhost:8545`.
pub fn get_block_number( pub async fn get_block_number(endpoint: &str, timeout: Duration) -> Result<u64, String> {
endpoint: &str, let response_body = send_rpc_request(endpoint, "eth_blockNumber", json!([]), timeout).await?;
timeout: Duration,
) -> impl Future<Item = u64, Error = String> {
send_rpc_request(endpoint, "eth_blockNumber", json!([]), timeout)
.and_then(|response_body| {
hex_to_u64_be( hex_to_u64_be(
response_result(&response_body)? response_result(&response_body)?
.ok_or_else(|| "No result field was returned for block number".to_string())? .ok_or_else(|| "No result field was returned for block number".to_string())?
.as_str() .as_str()
.ok_or_else(|| "Data was not string")?, .ok_or_else(|| "Data was not string")?,
) )
})
.map_err(|e| format!("Failed to get block number: {}", e)) .map_err(|e| format!("Failed to get block number: {}", e))
} }
/// Gets a block hash by block number. /// Gets a block hash by block number.
/// ///
/// Uses HTTP JSON RPC at `endpoint`. E.g., `http://localhost:8545`. /// Uses HTTP JSON RPC at `endpoint`. E.g., `http://localhost:8545`.
pub fn get_block( pub async fn get_block(
endpoint: &str, endpoint: &str,
block_number: u64, block_number: u64,
timeout: Duration, timeout: Duration,
) -> impl Future<Item = Block, Error = String> { ) -> Result<Block, String> {
let params = json!([ let params = json!([
format!("0x{:x}", block_number), format!("0x{:x}", block_number),
false // do not return full tx objects. false // do not return full tx objects.
]); ]);
send_rpc_request(endpoint, "eth_getBlockByNumber", params, timeout) let response_body = send_rpc_request(endpoint, "eth_getBlockByNumber", params, timeout).await?;
.and_then(|response_body| {
let hash = hex_to_bytes( let hash = hex_to_bytes(
response_result(&response_body)? response_result(&response_body)?
.ok_or_else(|| "No result field was returned for block".to_string())? .ok_or_else(|| "No result field was returned for block".to_string())?
@@ -112,7 +106,6 @@ pub fn get_block(
} else { } else {
Err(format!("Block number {} is larger than a usize", number)) Err(format!("Block number {} is larger than a usize", number))
} }
})
.map_err(|e| format!("Failed to get block number: {}", e)) .map_err(|e| format!("Failed to get block number: {}", e))
} }
@@ -122,20 +115,21 @@ pub fn get_block(
/// Assumes that the `address` has the same ABI as the eth2 deposit contract. /// Assumes that the `address` has the same ABI as the eth2 deposit contract.
/// ///
/// Uses HTTP JSON RPC at `endpoint`. E.g., `http://localhost:8545`. /// Uses HTTP JSON RPC at `endpoint`. E.g., `http://localhost:8545`.
pub fn get_deposit_count( pub async fn get_deposit_count(
endpoint: &str, endpoint: &str,
address: &str, address: &str,
block_number: u64, block_number: u64,
timeout: Duration, timeout: Duration,
) -> impl Future<Item = Option<u64>, Error = String> { ) -> Result<Option<u64>, String> {
call( let result = call(
endpoint, endpoint,
address, address,
DEPOSIT_COUNT_FN_SIGNATURE, DEPOSIT_COUNT_FN_SIGNATURE,
block_number, block_number,
timeout, timeout,
) )
.and_then(|result| match result { .await?;
match result {
None => Err("Deposit root response was none".to_string()), None => Err("Deposit root response was none".to_string()),
Some(bytes) => { Some(bytes) => {
if bytes.is_empty() { if bytes.is_empty() {
@@ -151,7 +145,7 @@ pub fn get_deposit_count(
)) ))
} }
} }
}) }
} }
/// Returns the value of the `get_hash_tree_root()` call at the given `block_number`. /// Returns the value of the `get_hash_tree_root()` call at the given `block_number`.
@@ -159,20 +153,21 @@ pub fn get_deposit_count(
/// Assumes that the `address` has the same ABI as the eth2 deposit contract. /// Assumes that the `address` has the same ABI as the eth2 deposit contract.
/// ///
/// Uses HTTP JSON RPC at `endpoint`. E.g., `http://localhost:8545`. /// Uses HTTP JSON RPC at `endpoint`. E.g., `http://localhost:8545`.
pub fn get_deposit_root( pub async fn get_deposit_root(
endpoint: &str, endpoint: &str,
address: &str, address: &str,
block_number: u64, block_number: u64,
timeout: Duration, timeout: Duration,
) -> impl Future<Item = Option<Hash256>, Error = String> { ) -> Result<Option<Hash256>, String> {
call( let result = call(
endpoint, endpoint,
address, address,
DEPOSIT_ROOT_FN_SIGNATURE, DEPOSIT_ROOT_FN_SIGNATURE,
block_number, block_number,
timeout, timeout,
) )
.and_then(|result| match result { .await?;
match result {
None => Err("Deposit root response was none".to_string()), None => Err("Deposit root response was none".to_string()),
Some(bytes) => { Some(bytes) => {
if bytes.is_empty() { if bytes.is_empty() {
@@ -186,7 +181,7 @@ pub fn get_deposit_root(
)) ))
} }
} }
}) }
} }
/// Performs a instant, no-transaction call to the contract `address` with the given `0x`-prefixed /// Performs a instant, no-transaction call to the contract `address` with the given `0x`-prefixed
@@ -195,13 +190,13 @@ pub fn get_deposit_root(
/// Returns bytes, if any. /// Returns bytes, if any.
/// ///
/// Uses HTTP JSON RPC at `endpoint`. E.g., `http://localhost:8545`. /// Uses HTTP JSON RPC at `endpoint`. E.g., `http://localhost:8545`.
fn call( async fn call(
endpoint: &str, endpoint: &str,
address: &str, address: &str,
hex_data: &str, hex_data: &str,
block_number: u64, block_number: u64,
timeout: Duration, timeout: Duration,
) -> impl Future<Item = Option<Vec<u8>>, Error = String> { ) -> Result<Option<Vec<u8>>, String> {
let params = json! ([ let params = json! ([
{ {
"to": address, "to": address,
@@ -210,7 +205,7 @@ fn call(
format!("0x{:x}", block_number) format!("0x{:x}", block_number)
]); ]);
send_rpc_request(endpoint, "eth_call", params, timeout).and_then(|response_body| { let response_body = send_rpc_request(endpoint, "eth_call", params, timeout).await?;
match response_result(&response_body)? { match response_result(&response_body)? {
None => Ok(None), None => Ok(None),
Some(result) => { Some(result) => {
@@ -222,7 +217,6 @@ fn call(
Ok(Some(hex_to_bytes(&hex)?)) Ok(Some(hex_to_bytes(&hex)?))
} }
} }
})
} }
/// A reduced set of fields from an Eth1 contract log. /// A reduced set of fields from an Eth1 contract log.
@@ -238,12 +232,12 @@ pub struct Log {
/// It's not clear from the Ethereum JSON-RPC docs if this range is inclusive or not. /// It's not clear from the Ethereum JSON-RPC docs if this range is inclusive or not.
/// ///
/// Uses HTTP JSON RPC at `endpoint`. E.g., `http://localhost:8545`. /// Uses HTTP JSON RPC at `endpoint`. E.g., `http://localhost:8545`.
pub fn get_deposit_logs_in_range( pub async fn get_deposit_logs_in_range(
endpoint: &str, endpoint: &str,
address: &str, address: &str,
block_height_range: Range<u64>, block_height_range: Range<u64>,
timeout: Duration, timeout: Duration,
) -> impl Future<Item = Vec<Log>, Error = String> { ) -> Result<Vec<Log>, String> {
let params = json! ([{ let params = json! ([{
"address": address, "address": address,
"topics": [DEPOSIT_EVENT_TOPIC], "topics": [DEPOSIT_EVENT_TOPIC],
@@ -251,8 +245,7 @@ pub fn get_deposit_logs_in_range(
"toBlock": format!("0x{:x}", block_height_range.end), "toBlock": format!("0x{:x}", block_height_range.end),
}]); }]);
send_rpc_request(endpoint, "eth_getLogs", params, timeout) let response_body = send_rpc_request(endpoint, "eth_getLogs", params, timeout).await?;
.and_then(|response_body| {
response_result(&response_body)? response_result(&response_body)?
.ok_or_else(|| "No result field was returned for deposit logs".to_string())? .ok_or_else(|| "No result field was returned for deposit logs".to_string())?
.as_array() .as_array()
@@ -278,19 +271,18 @@ pub fn get_deposit_logs_in_range(
}) })
}) })
.collect::<Result<Vec<Log>, String>>() .collect::<Result<Vec<Log>, String>>()
})
.map_err(|e| format!("Failed to get logs in range: {}", e)) .map_err(|e| format!("Failed to get logs in range: {}", e))
} }
/// Sends an RPC request to `endpoint`, using a POST with the given `body`. /// Sends an RPC request to `endpoint`, using a POST with the given `body`.
/// ///
/// Tries to receive the response and parse the body as a `String`. /// Tries to receive the response and parse the body as a `String`.
pub fn send_rpc_request( pub async fn send_rpc_request(
endpoint: &str, endpoint: &str,
method: &str, method: &str,
params: Value, params: Value,
timeout: Duration, timeout: Duration,
) -> impl Future<Item = String, Error = String> { ) -> Result<String, String> {
let body = json! ({ let body = json! ({
"jsonrpc": "2.0", "jsonrpc": "2.0",
"method": method, "method": method,
@@ -303,7 +295,7 @@ pub fn send_rpc_request(
// //
// A better solution would be to create some struct that contains a built client and pass it // A better solution would be to create some struct that contains a built client and pass it
// around (similar to the `web3` crate's `Transport` structs). // around (similar to the `web3` crate's `Transport` structs).
ClientBuilder::new() let response = ClientBuilder::new()
.timeout(timeout) .timeout(timeout)
.build() .build()
.expect("The builder should always build a client") .expect("The builder should always build a client")
@@ -312,35 +304,25 @@ pub fn send_rpc_request(
.body(body) .body(body)
.send() .send()
.map_err(|e| format!("Request failed: {:?}", e)) .map_err(|e| format!("Request failed: {:?}", e))
.and_then(|response| { .await?;
if response.status() != StatusCode::OK { if response.status() != StatusCode::OK {
Err(format!( return Err(format!(
"Response HTTP status was not 200 OK: {}.", "Response HTTP status was not 200 OK: {}.",
response.status() response.status()
)) ));
} else { };
Ok(response) let encoding = response
}
})
.and_then(|response| {
response
.headers() .headers()
.get(CONTENT_TYPE) .get(CONTENT_TYPE)
.ok_or_else(|| "No content-type header in response".to_string()) .ok_or_else(|| "No content-type header in response".to_string())?
.and_then(|encoding| {
encoding
.to_str() .to_str()
.map(|s| s.to_string()) .map(|s| s.to_string())
.map_err(|e| format!("Failed to parse content-type header: {}", e)) .map_err(|e| format!("Failed to parse content-type header: {}", e))?;
})
.map(|encoding| (response, encoding))
})
.and_then(|(response, encoding)| {
response response
.into_body() .bytes()
.concat2()
.map(|chunk| chunk.iter().cloned().collect::<Vec<u8>>())
.map_err(|e| format!("Failed to receive body: {:?}", e)) .map_err(|e| format!("Failed to receive body: {:?}", e))
.await
.and_then(move |bytes| match encoding.as_str() { .and_then(move |bytes| match encoding.as_str() {
"application/json" => Ok(bytes), "application/json" => Ok(bytes),
"application/json; charset=utf-8" => Ok(bytes), "application/json; charset=utf-8" => Ok(bytes),
@@ -348,7 +330,6 @@ pub fn send_rpc_request(
}) })
.map(|bytes| String::from_utf8_lossy(&bytes).into_owned()) .map(|bytes| String::from_utf8_lossy(&bytes).into_owned())
.map_err(|e| format!("Failed to receive body: {:?}", e)) .map_err(|e| format!("Failed to receive body: {:?}", e))
})
} }
/// Accepts an entire HTTP body (as a string) and returns the `result` field, as a serde `Value`. /// Accepts an entire HTTP body (as a string) and returns the `result` field, as a serde `Value`.

View File

@@ -7,16 +7,18 @@ use crate::{
DepositLog, DepositLog,
}; };
use futures::{ use futures::{
future::{loop_fn, Loop}, future::{FutureExt, TryFutureExt},
stream, Future, Stream, stream,
stream::TryStreamExt,
}; };
use parking_lot::{RwLock, RwLockReadGuard}; use parking_lot::{RwLock, RwLockReadGuard};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use slog::{debug, error, trace, Logger}; use slog::{debug, error, trace, Logger};
use std::ops::{Range, RangeInclusive}; use std::ops::{Range, RangeInclusive};
use std::sync::Arc; use std::sync::Arc;
use std::time::{Duration, Instant}; use std::time::Duration;
use tokio::timer::Delay; use tokio::sync::oneshot::error::TryRecvError;
use tokio::time::delay_for;
const STANDARD_TIMEOUT_MILLIS: u64 = 15_000; const STANDARD_TIMEOUT_MILLIS: u64 = 15_000;
@@ -245,29 +247,23 @@ impl Service {
/// - Err(_) if there is an error. /// - Err(_) if there is an error.
/// ///
/// Emits logs for debugging and errors. /// Emits logs for debugging and errors.
pub fn update( pub async fn update(
&self, &self,
) -> impl Future<Item = (DepositCacheUpdateOutcome, BlockCacheUpdateOutcome), Error = String> ) -> Result<(DepositCacheUpdateOutcome, BlockCacheUpdateOutcome), String> {
{
let log_a = self.log.clone();
let log_b = self.log.clone();
let inner_1 = self.inner.clone();
let inner_2 = self.inner.clone();
let deposit_future = self let deposit_future = self
.update_deposit_cache() .update_deposit_cache()
.map_err(|e| format!("Failed to update eth1 cache: {:?}", e)) .map_err(|e| format!("Failed to update eth1 cache: {:?}", e))
.then(move |result| { .then(|result| async move{
match &result { match &result {
Ok(DepositCacheUpdateOutcome::Success { logs_imported }) => trace!( Ok(DepositCacheUpdateOutcome::Success { logs_imported }) => trace!(
log_a, self.log,
"Updated eth1 deposit cache"; "Updated eth1 deposit cache";
"cached_deposits" => inner_1.deposit_cache.read().cache.len(), "cached_deposits" => self.inner.deposit_cache.read().cache.len(),
"logs_imported" => logs_imported, "logs_imported" => logs_imported,
"last_processed_eth1_block" => inner_1.deposit_cache.read().last_processed_block, "last_processed_eth1_block" => self.inner.deposit_cache.read().last_processed_block,
), ),
Err(e) => error!( Err(e) => error!(
log_a, self.log,
"Failed to update eth1 deposit cache"; "Failed to update eth1 deposit cache";
"error" => e "error" => e
), ),
@@ -279,20 +275,20 @@ impl Service {
let block_future = self let block_future = self
.update_block_cache() .update_block_cache()
.map_err(|e| format!("Failed to update eth1 cache: {:?}", e)) .map_err(|e| format!("Failed to update eth1 cache: {:?}", e))
.then(move |result| { .then(|result| async move {
match &result { match &result {
Ok(BlockCacheUpdateOutcome::Success { Ok(BlockCacheUpdateOutcome::Success {
blocks_imported, blocks_imported,
head_block_number, head_block_number,
}) => trace!( }) => trace!(
log_b, self.log,
"Updated eth1 block cache"; "Updated eth1 block cache";
"cached_blocks" => inner_2.block_cache.read().len(), "cached_blocks" => self.inner.block_cache.read().len(),
"blocks_imported" => blocks_imported, "blocks_imported" => blocks_imported,
"head_block" => head_block_number, "head_block" => head_block_number,
), ),
Err(e) => error!( Err(e) => error!(
log_b, self.log,
"Failed to update eth1 block cache"; "Failed to update eth1 block cache";
"error" => e "error" => e
), ),
@@ -301,7 +297,7 @@ impl Service {
result result
}); });
deposit_future.join(block_future) futures::try_join!(deposit_future, block_future)
} }
/// A looping future that updates the cache, then waits `config.auto_update_interval` before /// A looping future that updates the cache, then waits `config.auto_update_interval` before
@@ -313,31 +309,24 @@ impl Service {
/// - Err(_) if there is an error. /// - Err(_) if there is an error.
/// ///
/// Emits logs for debugging and errors. /// Emits logs for debugging and errors.
pub fn auto_update( pub async fn auto_update(
&self, &self,
exit: tokio::sync::oneshot::Receiver<()>, mut exit: tokio::sync::oneshot::Receiver<()>,
) -> impl Future<Item = (), Error = ()> { ) -> Result<(), ()> {
let service = self.clone();
let log = self.log.clone();
let update_interval = Duration::from_millis(self.config().auto_update_interval_millis); let update_interval = Duration::from_millis(self.config().auto_update_interval_millis);
let loop_future = loop_fn((), move |()| { loop {
let service = service.clone(); let update_result = self.update().await;
let log_a = log.clone();
let log_b = log.clone();
service
.update()
.then(move |update_result| {
match update_result { match update_result {
Err(e) => error!( Err(e) => error!(
log_a, self.log,
"Failed to update eth1 cache"; "Failed to update eth1 cache";
"retry_millis" => update_interval.as_millis(), "retry_millis" => update_interval.as_millis(),
"error" => e, "error" => e,
), ),
Ok((deposit, block)) => debug!( Ok((deposit, block)) => debug!(
log_a, self.log,
"Updated eth1 cache"; "Updated eth1 cache";
"retry_millis" => update_interval.as_millis(), "retry_millis" => update_interval.as_millis(),
"blocks" => format!("{:?}", block), "blocks" => format!("{:?}", block),
@@ -345,24 +334,15 @@ impl Service {
), ),
}; };
// Do not break the loop if there is an update failure. // WARNING: delay_for doesn't return an error and panics on error.
Ok(()) delay_for(update_interval).await;
}) match exit.try_recv() {
.and_then(move |_| Delay::new(Instant::now() + update_interval)) Ok(_) | Err(TryRecvError::Closed) => break,
.then(move |timer_result| { Err(TryRecvError::Empty) => {}
if let Err(e) = timer_result { }
error!(
log_b,
"Failed to trigger eth1 cache update delay";
"error" => format!("{:?}", e),
);
} }
// Do not break the loop if there is an timer failure.
Ok(Loop::Continue(()))
})
});
loop_future.select(exit).map(|_| ()).map_err(|_| ()) Ok(())
} }
/// Contacts the remote eth1 node and attempts to import deposit logs up to the configured /// Contacts the remote eth1 node and attempts to import deposit logs up to the configured
@@ -377,11 +357,7 @@ impl Service {
/// - Err(_) if there is an error. /// - Err(_) if there is an error.
/// ///
/// Emits logs for debugging and errors. /// Emits logs for debugging and errors.
pub fn update_deposit_cache( pub async fn update_deposit_cache(&self) -> Result<DepositCacheUpdateOutcome, Error> {
&self,
) -> impl Future<Item = DepositCacheUpdateOutcome, Error = Error> {
let service_1 = self.clone();
let service_2 = self.clone();
let blocks_per_log_query = self.config().blocks_per_log_query; let blocks_per_log_query = self.config().blocks_per_log_query;
let max_log_requests_per_update = self let max_log_requests_per_update = self
.config() .config()
@@ -395,14 +371,14 @@ impl Service {
.map(|n| n + 1) .map(|n| n + 1)
.unwrap_or_else(|| self.config().deposit_contract_deploy_block); .unwrap_or_else(|| self.config().deposit_contract_deploy_block);
get_new_block_numbers( let range = get_new_block_numbers(
&self.config().endpoint, &self.config().endpoint,
next_required_block, next_required_block,
self.config().follow_distance, self.config().follow_distance,
) )
.map(move |range| { .await?;
range
.map(|range| { let block_number_chunks = if let Some(range) = range {
range range
.collect::<Vec<u64>>() .collect::<Vec<u64>>()
.chunks(blocks_per_log_query) .chunks(blocks_per_log_query)
@@ -413,41 +389,38 @@ impl Service {
first..last first..last
}) })
.collect::<Vec<Range<u64>>>() .collect::<Vec<Range<u64>>>()
}) } else {
.unwrap_or_else(|| vec![]) Vec::new()
}) };
.and_then(move |block_number_chunks| {
stream::unfold( stream::try_unfold(block_number_chunks.into_iter(), |mut chunks| async move {
block_number_chunks.into_iter(), match chunks.next() {
move |mut chunks| match chunks.next() {
Some(chunk) => { Some(chunk) => {
let chunk_1 = chunk.clone(); let chunk_1 = chunk.clone();
Some( match get_deposit_logs_in_range(
get_deposit_logs_in_range( &self.config().endpoint,
&service_1.config().endpoint, &self.config().deposit_contract_address,
&service_1.config().deposit_contract_address,
chunk, chunk,
Duration::from_millis(GET_DEPOSIT_LOG_TIMEOUT_MILLIS), Duration::from_millis(GET_DEPOSIT_LOG_TIMEOUT_MILLIS),
) )
.map_err(Error::GetDepositLogsFailed) .await
.map(|logs| (chunk_1, logs)) {
.map(|logs| (logs, chunks)), Ok(logs) => Ok(Some(((chunk_1, logs), chunks))),
) Err(e) => Err(Error::GetDepositLogsFailed(e)),
} }
None => None, }
}, None => Ok(None),
) }
.fold(0, move |mut sum, (block_range, log_chunk)| { })
let mut cache = service_2.deposits().write(); .try_fold(0, |mut sum: usize, (block_range, log_chunk)| async move {
let mut cache = self.deposits().write();
log_chunk log_chunk
.into_iter() .into_iter()
.map(|raw_log| { .map(|raw_log| {
DepositLog::from_log(&raw_log).map_err(|error| { DepositLog::from_log(&raw_log).map_err(|error| Error::FailedToParseDepositLog {
Error::FailedToParseDepositLog {
block_range: block_range.clone(), block_range: block_range.clone(),
error, error,
}
}) })
}) })
// Return early if any of the logs cannot be parsed. // Return early if any of the logs cannot be parsed.
@@ -484,8 +457,12 @@ impl Service {
Ok(sum) Ok(sum)
}) })
.map(|logs_imported| DepositCacheUpdateOutcome::Success { logs_imported }) .map(|logs_imported| {
Ok(DepositCacheUpdateOutcome::Success {
logs_imported: logs_imported?,
}) })
})
.await
} }
/// Contacts the remote eth1 node and attempts to import all blocks up to the configured /// Contacts the remote eth1 node and attempts to import all blocks up to the configured
@@ -499,44 +476,37 @@ impl Service {
/// - Err(_) if there is an error. /// - Err(_) if there is an error.
/// ///
/// Emits logs for debugging and errors. /// Emits logs for debugging and errors.
pub fn update_block_cache(&self) -> impl Future<Item = BlockCacheUpdateOutcome, Error = Error> { pub async fn update_block_cache(&self) -> Result<BlockCacheUpdateOutcome, Error> {
let cache_1 = self.inner.clone();
let cache_2 = self.inner.clone();
let cache_3 = self.inner.clone();
let cache_4 = self.inner.clone();
let cache_5 = self.inner.clone();
let cache_6 = self.inner.clone();
let block_cache_truncation = self.config().block_cache_truncation; let block_cache_truncation = self.config().block_cache_truncation;
let max_blocks_per_update = self let max_blocks_per_update = self
.config() .config()
.max_blocks_per_update .max_blocks_per_update
.unwrap_or_else(usize::max_value); .unwrap_or_else(usize::max_value);
let next_required_block = cache_1 let next_required_block = self
.inner
.block_cache .block_cache
.read() .read()
.highest_block_number() .highest_block_number()
.map(|n| n + 1) .map(|n| n + 1)
.unwrap_or_else(|| self.config().lowest_cached_block_number); .unwrap_or_else(|| self.config().lowest_cached_block_number);
get_new_block_numbers( let range = get_new_block_numbers(
&self.config().endpoint, &self.config().endpoint,
next_required_block, next_required_block,
self.config().follow_distance, self.config().follow_distance,
) )
.await?;
// Map the range of required blocks into a Vec. // Map the range of required blocks into a Vec.
// //
// If the required range is larger than the size of the cache, drop the exiting cache // If the required range is larger than the size of the cache, drop the exiting cache
// because it's exipred and just download enough blocks to fill the cache. // because it's exipred and just download enough blocks to fill the cache.
.and_then(move |range| { let required_block_numbers = if let Some(range) = range {
range
.map(|range| {
if range.start() > range.end() { if range.start() > range.end() {
// Note: this check is not strictly necessary, however it remains to safe // Note: this check is not strictly necessary, however it remains to safe
// guard against any regression which may cause an underflow in a following // guard against any regression which may cause an underflow in a following
// subtraction operation. // subtraction operation.
Err(Error::Internal("Range was not increasing".into())) return Err(Error::Internal("Range was not increasing".into()));
} else { } else {
let range_size = range.end() - range.start(); let range_size = range.end() - range.start();
let max_size = block_cache_truncation let max_size = block_cache_truncation
@@ -546,19 +516,19 @@ impl Service {
// If the range of required blocks is larger than `max_size`, drop all // If the range of required blocks is larger than `max_size`, drop all
// existing blocks and download `max_size` count of blocks. // existing blocks and download `max_size` count of blocks.
let first_block = range.end() - max_size; let first_block = range.end() - max_size;
(*cache_5.block_cache.write()) = BlockCache::default(); (*self.inner.block_cache.write()) = BlockCache::default();
Ok((first_block..=*range.end()).collect::<Vec<u64>>()) (first_block..=*range.end()).collect::<Vec<u64>>()
} else { } else {
Ok(range.collect::<Vec<u64>>()) range.collect::<Vec<u64>>()
} }
} }
}) } else {
.unwrap_or_else(|| Ok(vec![])) Vec::new()
}) };
// Download the range of blocks and sequentially import them into the cache. // Download the range of blocks and sequentially import them into the cache.
.and_then(move |required_block_numbers| {
// Last processed block in deposit cache // Last processed block in deposit cache
let latest_in_cache = cache_6 let latest_in_cache = self
.inner
.deposit_cache .deposit_cache
.read() .read()
.last_processed_block .last_processed_block
@@ -571,18 +541,25 @@ impl Service {
.collect::<Vec<_>>(); .collect::<Vec<_>>();
// Produce a stream from the list of required block numbers and return a future that // Produce a stream from the list of required block numbers and return a future that
// consumes the it. // consumes the it.
stream::unfold(
let eth1_blocks = stream::try_unfold(
required_block_numbers.into_iter(), required_block_numbers.into_iter(),
move |mut block_numbers| match block_numbers.next() { |mut block_numbers| async move {
Some(block_number) => Some( match block_numbers.next() {
download_eth1_block(cache_2.clone(), block_number) Some(block_number) => {
.map(|v| (v, block_numbers)), match download_eth1_block(self.inner.clone(), block_number).await {
), Ok(eth1_block) => Ok(Some((eth1_block, block_numbers))),
None => None, Err(e) => Err(e),
}
}
None => Ok(None),
}
}, },
) );
.fold(0, move |sum, eth1_block| {
cache_3 let blocks_imported = eth1_blocks
.try_fold(0, |sum: usize, eth1_block| async move {
self.inner
.block_cache .block_cache
.write() .write()
.insert_root_or_child(eth1_block) .insert_root_or_child(eth1_block)
@@ -590,11 +567,11 @@ impl Service {
metrics::set_gauge( metrics::set_gauge(
&metrics::BLOCK_CACHE_LEN, &metrics::BLOCK_CACHE_LEN,
cache_3.block_cache.read().len() as i64, self.inner.block_cache.read().len() as i64,
); );
metrics::set_gauge( metrics::set_gauge(
&metrics::LATEST_CACHED_BLOCK_TIMESTAMP, &metrics::LATEST_CACHED_BLOCK_TIMESTAMP,
cache_3 self.inner
.block_cache .block_cache
.read() .read()
.latest_block_timestamp() .latest_block_timestamp()
@@ -603,34 +580,33 @@ impl Service {
Ok(sum + 1) Ok(sum + 1)
}) })
}) .await?;
.and_then(move |blocks_imported| {
// Prune the block cache, preventing it from growing too large. // Prune the block cache, preventing it from growing too large.
cache_4.prune_blocks(); self.inner.prune_blocks();
metrics::set_gauge( metrics::set_gauge(
&metrics::BLOCK_CACHE_LEN, &metrics::BLOCK_CACHE_LEN,
cache_4.block_cache.read().len() as i64, self.inner.block_cache.read().len() as i64,
); );
Ok(BlockCacheUpdateOutcome::Success { Ok(BlockCacheUpdateOutcome::Success {
blocks_imported, blocks_imported,
head_block_number: cache_4.block_cache.read().highest_block_number(), head_block_number: self.inner.block_cache.read().highest_block_number(),
})
}) })
} }
} }
/// Determine the range of blocks that need to be downloaded, given the remotes best block and /// Determine the range of blocks that need to be downloaded, given the remotes best block and
/// the locally stored best block. /// the locally stored best block.
fn get_new_block_numbers<'a>( async fn get_new_block_numbers<'a>(
endpoint: &str, endpoint: &str,
next_required_block: u64, next_required_block: u64,
follow_distance: u64, follow_distance: u64,
) -> impl Future<Item = Option<RangeInclusive<u64>>, Error = Error> + 'a { ) -> Result<Option<RangeInclusive<u64>>, Error> {
let remote_highest_block =
get_block_number(endpoint, Duration::from_millis(BLOCK_NUMBER_TIMEOUT_MILLIS)) get_block_number(endpoint, Duration::from_millis(BLOCK_NUMBER_TIMEOUT_MILLIS))
.map_err(Error::GetBlockNumberFailed) .map_err(Error::GetBlockNumberFailed)
.and_then(move |remote_highest_block| { .await?;
let remote_follow_block = remote_highest_block.saturating_sub(follow_distance); let remote_follow_block = remote_highest_block.saturating_sub(follow_distance);
if next_required_block <= remote_follow_block { if next_required_block <= remote_follow_block {
@@ -650,17 +626,13 @@ fn get_new_block_numbers<'a>(
// Return an empty range. // Return an empty range.
Ok(None) Ok(None)
} }
})
} }
/// Downloads the `(block, deposit_root, deposit_count)` tuple from an eth1 node for the given /// Downloads the `(block, deposit_root, deposit_count)` tuple from an eth1 node for the given
/// `block_number`. /// `block_number`.
/// ///
/// Performs three async calls to an Eth1 HTTP JSON RPC endpoint. /// Performs three async calls to an Eth1 HTTP JSON RPC endpoint.
fn download_eth1_block<'a>( async fn download_eth1_block(cache: Arc<Inner>, block_number: u64) -> Result<Eth1Block, Error> {
cache: Arc<Inner>,
block_number: u64,
) -> impl Future<Item = Eth1Block, Error = Error> + 'a {
let deposit_root = cache let deposit_root = cache
.deposit_cache .deposit_cache
.read() .read()
@@ -672,13 +644,14 @@ fn download_eth1_block<'a>(
.cache .cache
.get_deposit_count_from_cache(block_number); .get_deposit_count_from_cache(block_number);
// Performs a `get_blockByNumber` call to an eth1 node. // Performs a `get_blockByNumber` call to an eth1 node.
get_block( let http_block = get_block(
&cache.config.read().endpoint, &cache.config.read().endpoint,
block_number, block_number,
Duration::from_millis(GET_BLOCK_TIMEOUT_MILLIS), Duration::from_millis(GET_BLOCK_TIMEOUT_MILLIS),
) )
.map_err(Error::BlockDownloadFailed) .map_err(Error::BlockDownloadFailed)
.map(move |http_block| Eth1Block { .await?;
Ok(Eth1Block {
hash: http_block.hash, hash: http_block.hash,
number: http_block.number, number: http_block.number,
timestamp: http_block.timestamp, timestamp: http_block.timestamp,