Remove fallback support from eth1 service (#3594)

## Issue Addressed

N/A

## Proposed Changes

With https://github.com/sigp/lighthouse/pull/3214 we made it such that you can either have 1 auth endpoint or multiple non auth endpoints. Now that we are post merge on all networks (testnets and mainnet), we cannot progress a chain without a dedicated auth execution layer connection so there is no point in having a non-auth eth1-endpoint for syncing deposit cache. 

This code removes all fallback related code in the eth1 service. We still keep the single non-auth endpoint since it's useful for testing.

## Additional Info

This removes all eth1 fallback related metrics that were relevant for the monitoring service, so we might need to change the api upstream.
This commit is contained in:
Pawan Dhananjay
2022-10-04 08:33:39 +00:00
parent 58bd2f76d0
commit 8728c40102
22 changed files with 228 additions and 802 deletions

View File

@@ -31,5 +31,4 @@ lighthouse_metrics = { path = "../../common/lighthouse_metrics"}
lazy_static = "1.4.0"
task_executor = { path = "../../common/task_executor" }
eth2 = { path = "../../common/eth2" }
fallback = { path = "../../common/fallback" }
sensitive_url = { path = "../../common/sensitive_url" }

View File

@@ -1,14 +1,14 @@
use crate::service::endpoint_from_config;
use crate::Config;
use crate::{
block_cache::{BlockCache, Eth1Block},
deposit_cache::{DepositCache, SszDepositCache},
service::EndpointsCache,
};
use execution_layer::HttpJsonRpc;
use parking_lot::RwLock;
use ssz::four_byte_option_impl;
use ssz::{Decode, Encode};
use ssz_derive::{Decode, Encode};
use std::sync::Arc;
use types::ChainSpec;
// Define "legacy" implementations of `Option<u64>` which use four bytes for encoding the union
@@ -31,11 +31,10 @@ impl DepositUpdater {
}
}
#[derive(Default)]
pub struct Inner {
pub block_cache: RwLock<BlockCache>,
pub deposit_cache: RwLock<DepositUpdater>,
pub endpoints_cache: RwLock<Option<Arc<EndpointsCache>>>,
pub endpoint: HttpJsonRpc,
pub config: RwLock<Config>,
pub remote_head_block: RwLock<Option<Eth1Block>>,
pub spec: ChainSpec,
@@ -96,7 +95,8 @@ impl SszEth1Cache {
cache: self.deposit_cache.to_deposit_cache()?,
last_processed_block: self.last_processed_block,
}),
endpoints_cache: RwLock::new(None),
endpoint: endpoint_from_config(&config)
.map_err(|e| format!("Failed to create endpoint: {:?}", e))?,
// Set the remote head_block zero when creating a new instance. We only care about
// present and future eth1 nodes.
remote_head_block: RwLock::new(None),

View File

@@ -17,16 +17,6 @@ lazy_static! {
pub static ref HIGHEST_PROCESSED_DEPOSIT_BLOCK: Result<IntGauge> =
try_create_int_gauge("eth1_highest_processed_deposit_block", "Number of the last block checked for deposits");
/*
* Eth1 endpoint errors
*/
pub static ref ENDPOINT_ERRORS: Result<IntCounterVec> = try_create_int_counter_vec(
"eth1_endpoint_errors", "The number of eth1 request errors for each endpoint", &["endpoint"]
);
pub static ref ENDPOINT_REQUESTS: Result<IntCounterVec> = try_create_int_counter_vec(
"eth1_endpoint_requests", "The number of eth1 requests for each endpoint", &["endpoint"]
);
/*
* Eth1 rpc connection
*/
@@ -35,14 +25,4 @@ lazy_static! {
"sync_eth1_connected", "Set to 1 if connected to an eth1 node, otherwise set to 0"
);
pub static ref ETH1_FALLBACK_CONFIGURED: Result<IntGauge> = try_create_int_gauge(
"sync_eth1_fallback_configured", "Number of configured eth1 fallbacks"
);
// Note: This metric only checks if an eth1 fallback is configured, not if it is connected and synced.
// Checking for liveness of the fallback would require moving away from lazy checking of fallbacks.
pub static ref ETH1_FALLBACK_CONNECTED: Result<IntGauge> = try_create_int_gauge(
"eth1_sync_fallback_connected", "Set to 1 if an eth1 fallback is connected, otherwise set to 0"
);
}

View File

@@ -9,19 +9,16 @@ use execution_layer::http::{
deposit_methods::{BlockQuery, Eth1Id},
HttpJsonRpc,
};
use fallback::{Fallback, FallbackError};
use futures::future::TryFutureExt;
use parking_lot::{RwLock, RwLockReadGuard};
use sensitive_url::SensitiveUrl;
use serde::{Deserialize, Serialize};
use slog::{debug, error, info, trace, warn, Logger};
use std::fmt::Debug;
use std::future::Future;
use std::ops::{Range, RangeInclusive};
use std::path::PathBuf;
use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};
use tokio::sync::RwLock as TRwLock;
use tokio::time::{interval_at, Duration, Instant};
use types::{ChainSpec, EthSpec, Unsigned};
@@ -53,127 +50,12 @@ const CACHE_FACTOR: u64 = 2;
#[derive(Debug, PartialEq, Clone)]
pub enum EndpointError {
RequestFailed(String),
WrongNetworkId,
WrongChainId,
FarBehind,
}
type EndpointState = Result<(), EndpointError>;
pub struct EndpointWithState {
client: HttpJsonRpc,
state: TRwLock<Option<EndpointState>>,
}
impl EndpointWithState {
pub fn new(client: HttpJsonRpc) -> Self {
Self {
client,
state: TRwLock::new(None),
}
}
}
async fn reset_endpoint_state(endpoint: &EndpointWithState) {
*endpoint.state.write().await = None;
}
async fn get_state(endpoint: &EndpointWithState) -> Option<EndpointState> {
endpoint.state.read().await.clone()
}
/// A cache structure to lazily check usability of endpoints. An endpoint is usable if it is
/// reachable and has the correct network id and chain id. Emits a `WARN` log if a checked endpoint
/// is not usable.
pub struct EndpointsCache {
pub fallback: Fallback<EndpointWithState>,
pub config_chain_id: Eth1Id,
pub log: Logger,
}
impl EndpointsCache {
/// Checks the usability of an endpoint. Results get cached and therefore only the first call
/// for each endpoint does the real check.
async fn state(&self, endpoint: &EndpointWithState) -> EndpointState {
if let Some(result) = endpoint.state.read().await.clone() {
return result;
}
let mut value = endpoint.state.write().await;
if let Some(result) = value.clone() {
return result;
}
crate::metrics::inc_counter_vec(
&crate::metrics::ENDPOINT_REQUESTS,
&[&endpoint.client.to_string()],
);
let state = endpoint_state(&endpoint.client, &self.config_chain_id, &self.log).await;
*value = Some(state.clone());
if state.is_err() {
crate::metrics::inc_counter_vec(
&crate::metrics::ENDPOINT_ERRORS,
&[&endpoint.client.to_string()],
);
crate::metrics::set_gauge(&metrics::ETH1_CONNECTED, 0);
} else {
crate::metrics::set_gauge(&metrics::ETH1_CONNECTED, 1);
}
state
}
/// Return the first successful result along with number of previous errors encountered
/// or all the errors encountered if every none of the fallback endpoints return required output.
pub async fn first_success<'a, F, O, R>(
&'a self,
func: F,
) -> Result<(O, usize), FallbackError<SingleEndpointError>>
where
F: Fn(&'a HttpJsonRpc) -> R,
R: Future<Output = Result<O, SingleEndpointError>>,
{
let func = &func;
self.fallback
.first_success(|endpoint| async move {
match self.state(endpoint).await {
Ok(()) => {
let endpoint_str = &endpoint.client.to_string();
crate::metrics::inc_counter_vec(
&crate::metrics::ENDPOINT_REQUESTS,
&[endpoint_str],
);
match func(&endpoint.client).await {
Ok(t) => Ok(t),
Err(t) => {
crate::metrics::inc_counter_vec(
&crate::metrics::ENDPOINT_ERRORS,
&[endpoint_str],
);
if let SingleEndpointError::EndpointError(e) = &t {
*endpoint.state.write().await = Some(Err(e.clone()));
} else {
// A non-`EndpointError` error occurred, so reset the state.
reset_endpoint_state(endpoint).await;
}
Err(t)
}
}
}
Err(e) => Err(SingleEndpointError::EndpointError(e)),
}
})
.await
}
pub async fn reset_errorred_endpoints(&self) {
for endpoint in &self.fallback.servers {
if let Some(state) = get_state(endpoint).await {
if state.is_err() {
reset_endpoint_state(endpoint).await;
}
}
}
}
}
/// Returns `Ok` if the endpoint is usable, i.e. is reachable and has a correct network id and
/// chain id. Otherwise it returns `Err`.
async fn endpoint_state(
@@ -186,7 +68,6 @@ async fn endpoint_state(
log,
"Error connecting to eth1 node endpoint";
"endpoint" => %endpoint,
"action" => "trying fallbacks"
);
EndpointError::RequestFailed(e)
};
@@ -202,7 +83,6 @@ async fn endpoint_state(
log,
"Remote execution node is not synced";
"endpoint" => %endpoint,
"action" => "trying fallbacks"
);
return Err(EndpointError::FarBehind);
}
@@ -211,7 +91,6 @@ async fn endpoint_state(
log,
"Invalid execution chain ID. Please switch to correct chain ID on endpoint";
"endpoint" => %endpoint,
"action" => "trying fallbacks",
"expected" => ?config_chain_id,
"received" => ?chain_id,
);
@@ -240,7 +119,7 @@ async fn get_remote_head_and_new_block_ranges(
Option<RangeInclusive<u64>>,
Option<RangeInclusive<u64>>,
),
SingleEndpointError,
Error,
> {
let remote_head_block = download_eth1_block(endpoint, service.inner.clone(), None).await?;
let now = SystemTime::now()
@@ -253,18 +132,16 @@ async fn get_remote_head_and_new_block_ranges(
"Execution endpoint is not synced";
"endpoint" => %endpoint,
"last_seen_block_unix_timestamp" => remote_head_block.timestamp,
"action" => "trying fallback"
);
return Err(SingleEndpointError::EndpointError(EndpointError::FarBehind));
return Err(Error::EndpointError(EndpointError::FarBehind));
}
let handle_remote_not_synced = |e| {
if let SingleEndpointError::RemoteNotSynced { .. } = e {
if let Error::RemoteNotSynced { .. } = e {
warn!(
service.log,
"Execution endpoint is not synced";
"endpoint" => %endpoint,
"action" => "trying fallbacks"
);
}
e
@@ -296,16 +173,25 @@ async fn relevant_new_block_numbers_from_endpoint(
endpoint: &HttpJsonRpc,
service: &Service,
head_type: HeadType,
) -> Result<Option<RangeInclusive<u64>>, SingleEndpointError> {
) -> Result<Option<RangeInclusive<u64>>, Error> {
let remote_highest_block = endpoint
.get_block_number(Duration::from_millis(BLOCK_NUMBER_TIMEOUT_MILLIS))
.map_err(SingleEndpointError::GetBlockNumberFailed)
.map_err(Error::GetBlockNumberFailed)
.await?;
service.relevant_new_block_numbers(remote_highest_block, None, head_type)
}
#[derive(Debug, PartialEq)]
pub enum SingleEndpointError {
pub enum Error {
/// There was an inconsistency when adding a block to the cache.
FailedToInsertEth1Block(BlockCacheError),
/// There was an inconsistency when adding a deposit to the cache.
FailedToInsertDeposit(DepositCacheError),
/// A log downloaded from the eth1 contract was not well formed.
FailedToParseDepositLog {
block_range: Range<u64>,
error: String,
},
/// Endpoint is currently not functional.
EndpointError(EndpointError),
/// The remote node is less synced that we expect, it is not useful until has done more
@@ -325,21 +211,6 @@ pub enum SingleEndpointError {
GetDepositCountFailed(String),
/// Failed to read the deposit contract root from the eth1 node.
GetDepositLogsFailed(String),
}
#[derive(Debug, PartialEq)]
pub enum Error {
/// There was an inconsistency when adding a block to the cache.
FailedToInsertEth1Block(BlockCacheError),
/// There was an inconsistency when adding a deposit to the cache.
FailedToInsertDeposit(DepositCacheError),
/// A log downloaded from the eth1 contract was not well formed.
FailedToParseDepositLog {
block_range: Range<u64>,
error: String,
},
/// All possible endpoints returned a `SingleEndpointError`.
FallbackError(FallbackError<SingleEndpointError>),
/// There was an unexpected internal error.
Internal(String),
}
@@ -367,21 +238,14 @@ pub enum Eth1Endpoint {
jwt_id: Option<String>,
jwt_version: Option<String>,
},
NoAuth(Vec<SensitiveUrl>),
NoAuth(SensitiveUrl),
}
impl Eth1Endpoint {
fn len(&self) -> usize {
pub fn get_endpoint(&self) -> SensitiveUrl {
match &self {
Self::Auth { .. } => 1,
Self::NoAuth(urls) => urls.len(),
}
}
pub fn get_endpoints(&self) -> Vec<SensitiveUrl> {
match &self {
Self::Auth { endpoint, .. } => vec![endpoint.clone()],
Self::NoAuth(endpoints) => endpoints.clone(),
Self::Auth { endpoint, .. } => endpoint.clone(),
Self::NoAuth(endpoint) => endpoint.clone(),
}
}
}
@@ -389,7 +253,7 @@ impl Eth1Endpoint {
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Config {
/// An Eth1 node (e.g., Geth) running a HTTP JSON-RPC endpoint.
pub endpoints: Eth1Endpoint,
pub endpoint: Eth1Endpoint,
/// The address the `BlockCache` and `DepositCache` should assume is the canonical deposit contract.
pub deposit_contract_address: String,
/// The eth1 chain id where the deposit contract is deployed (Goerli/Mainnet).
@@ -466,8 +330,10 @@ impl Config {
impl Default for Config {
fn default() -> Self {
Self {
endpoints: Eth1Endpoint::NoAuth(vec![SensitiveUrl::parse(DEFAULT_ETH1_ENDPOINT)
.expect("The default Eth1 endpoint must always be a valid URL.")]),
endpoint: Eth1Endpoint::NoAuth(
SensitiveUrl::parse(DEFAULT_ETH1_ENDPOINT)
.expect("The default Eth1 endpoint must always be a valid URL."),
),
deposit_contract_address: "0x0000000000000000000000000000000000000000".into(),
chain_id: DEFAULT_CHAIN_ID,
deposit_contract_deploy_block: 1,
@@ -485,6 +351,24 @@ impl Default for Config {
}
}
pub fn endpoint_from_config(config: &Config) -> Result<HttpJsonRpc, String> {
match config.endpoint.clone() {
Eth1Endpoint::Auth {
endpoint,
jwt_path,
jwt_id,
jwt_version,
} => {
let auth = Auth::new_with_path(jwt_path, jwt_id, jwt_version)
.map_err(|e| format!("Failed to initialize jwt auth: {:?}", e))?;
HttpJsonRpc::new_with_auth(endpoint, auth)
.map_err(|e| format!("Failed to create eth1 json rpc client: {:?}", e))
}
Eth1Endpoint::NoAuth(endpoint) => HttpJsonRpc::new(endpoint)
.map_err(|e| format!("Failed to create eth1 json rpc client: {:?}", e)),
}
}
/// Provides a set of Eth1 caches and async functions to update them.
///
/// Stores the following caches:
@@ -499,20 +383,24 @@ pub struct Service {
impl Service {
/// Creates a new service. Does not attempt to connect to the eth1 node.
pub fn new(config: Config, log: Logger, spec: ChainSpec) -> Self {
Self {
pub fn new(config: Config, log: Logger, spec: ChainSpec) -> Result<Self, String> {
Ok(Self {
inner: Arc::new(Inner {
block_cache: <_>::default(),
deposit_cache: RwLock::new(DepositUpdater::new(
config.deposit_contract_deploy_block,
)),
endpoints_cache: RwLock::new(None),
endpoint: endpoint_from_config(&config)?,
remote_head_block: RwLock::new(None),
config: RwLock::new(config),
spec,
}),
log,
}
})
}
pub fn client(&self) -> &HttpJsonRpc {
&self.inner.endpoint
}
/// Returns the follow distance that has been shortened to accommodate for differences in the
@@ -676,52 +564,6 @@ impl Service {
self.inner.config.write().lowest_cached_block_number = block_number;
}
/// Builds a new `EndpointsCache` with empty states.
pub fn init_endpoints(&self) -> Result<Arc<EndpointsCache>, String> {
let endpoints = self.config().endpoints.clone();
let config_chain_id = self.config().chain_id.clone();
let servers = match endpoints {
Eth1Endpoint::Auth {
jwt_path,
endpoint,
jwt_id,
jwt_version,
} => {
let auth = Auth::new_with_path(jwt_path, jwt_id, jwt_version)
.map_err(|e| format!("Failed to initialize jwt auth: {:?}", e))?;
vec![HttpJsonRpc::new_with_auth(endpoint, auth)
.map_err(|e| format!("Failed to build auth enabled json rpc {:?}", e))?]
}
Eth1Endpoint::NoAuth(urls) => urls
.into_iter()
.map(|url| {
HttpJsonRpc::new(url).map_err(|e| format!("Failed to build json rpc {:?}", e))
})
.collect::<Result<_, _>>()?,
};
let new_cache = Arc::new(EndpointsCache {
fallback: Fallback::new(servers.into_iter().map(EndpointWithState::new).collect()),
config_chain_id,
log: self.log.clone(),
});
let mut endpoints_cache = self.inner.endpoints_cache.write();
*endpoints_cache = Some(new_cache.clone());
Ok(new_cache)
}
/// Returns the cached `EndpointsCache` if it exists or builds a new one.
pub fn get_endpoints(&self) -> Result<Arc<EndpointsCache>, String> {
let endpoints_cache = self.inner.endpoints_cache.read();
if let Some(cache) = endpoints_cache.clone() {
Ok(cache)
} else {
drop(endpoints_cache);
self.init_endpoints()
}
}
/// Update the deposit and block cache, returning an error if either fail.
///
/// ## Returns
@@ -733,56 +575,28 @@ impl Service {
pub async fn update(
&self,
) -> Result<(DepositCacheUpdateOutcome, BlockCacheUpdateOutcome), String> {
let endpoints = self.get_endpoints()?;
// Reset the state of any endpoints which have errored so their state can be redetermined.
endpoints.reset_errorred_endpoints().await;
let client = self.client();
let log = self.log.clone();
let chain_id = self.config().chain_id.clone();
let node_far_behind_seconds = self.inner.config.read().node_far_behind_seconds;
let process_single_err = |e: &FallbackError<SingleEndpointError>| {
match e {
FallbackError::AllErrored(errors) => {
if errors
.iter()
.all(|error| matches!(error, SingleEndpointError::EndpointError(_)))
{
error!(
self.log,
"No synced execution endpoint";
"advice" => "ensure you have an execution node configured via \
--execution-endpoint or if pre-merge, --eth1-endpoints"
);
}
}
match endpoint_state(client, &chain_id, &log).await {
Ok(()) => crate::metrics::set_gauge(&metrics::ETH1_CONNECTED, 1),
Err(e) => {
crate::metrics::set_gauge(&metrics::ETH1_CONNECTED, 0);
return Err(format!("Invalid endpoint state: {:?}", e));
}
endpoints.fallback.map_format_error(|s| &s.client, e)
};
let process_err = |e: Error| match &e {
Error::FallbackError(f) => process_single_err(f),
e => format!("{:?}", e),
};
let (
(remote_head_block, new_block_numbers_deposit, new_block_numbers_block_cache),
num_errors,
) = endpoints
.first_success(|e| async move {
get_remote_head_and_new_block_ranges(e, self, node_far_behind_seconds).await
})
.await
.map_err(|e| format!("{:?}", process_single_err(&e)))?;
if num_errors > 0 {
info!(self.log, "Fetched data from fallback"; "fallback_number" => num_errors);
}
let (remote_head_block, new_block_numbers_deposit, new_block_numbers_block_cache) =
get_remote_head_and_new_block_ranges(client, self, node_far_behind_seconds)
.await
.map_err(|e| format!("Failed to get remote head and new block ranges: {:?}", e))?;
*self.inner.remote_head_block.write() = Some(remote_head_block);
let update_deposit_cache = async {
let outcome_result = self
.update_deposit_cache(Some(new_block_numbers_deposit), &endpoints)
.update_deposit_cache(Some(new_block_numbers_deposit))
.await;
// Reset the `last_procesed block` to the last valid deposit's block number.
@@ -804,8 +618,8 @@ impl Service {
deposit_cache.last_processed_block = deposit_cache.cache.latest_block_number();
}
let outcome = outcome_result
.map_err(|e| format!("Failed to update deposit cache: {:?}", process_err(e)))?;
let outcome =
outcome_result.map_err(|e| format!("Failed to update deposit cache: {:?}", e))?;
trace!(
self.log,
@@ -819,14 +633,9 @@ impl Service {
let update_block_cache = async {
let outcome = self
.update_block_cache(Some(new_block_numbers_block_cache), &endpoints)
.update_block_cache(Some(new_block_numbers_block_cache))
.await
.map_err(|e| {
format!(
"Failed to update deposit contract block cache: {:?}",
process_err(e)
)
})?;
.map_err(|e| format!("Failed to update deposit contract block cache: {:?}", e))?;
trace!(
self.log,
@@ -858,7 +667,6 @@ impl Service {
let mut interval = interval_at(Instant::now(), update_interval);
let num_fallbacks = self.config().endpoints.len() - 1;
let update_future = async move {
loop {
interval.tick().await;
@@ -866,15 +674,6 @@ impl Service {
}
};
// Set the number of configured eth1 servers
metrics::set_gauge(&metrics::ETH1_FALLBACK_CONFIGURED, num_fallbacks as i64);
// Since we lazily update eth1 fallbacks, it's not possible to know connection status of fallback.
// Hence, we set it to 1 if we have atleast one configured fallback.
if num_fallbacks > 0 {
metrics::set_gauge(&metrics::ETH1_FALLBACK_CONNECTED, 1);
} else {
metrics::set_gauge(&metrics::ETH1_FALLBACK_CONNECTED, 0);
}
handle.spawn(update_future, "eth1");
}
@@ -904,7 +703,7 @@ impl Service {
remote_highest_block_number: u64,
remote_highest_block_timestamp: Option<u64>,
head_type: HeadType,
) -> Result<Option<RangeInclusive<u64>>, SingleEndpointError> {
) -> Result<Option<RangeInclusive<u64>>, Error> {
let follow_distance = self.cache_follow_distance();
let latest_cached_block = self.latest_cached_block();
let next_required_block = match head_type {
@@ -948,8 +747,8 @@ impl Service {
pub async fn update_deposit_cache(
&self,
new_block_numbers: Option<Option<RangeInclusive<u64>>>,
endpoints: &EndpointsCache,
) -> Result<DepositCacheUpdateOutcome, Error> {
let client = self.client();
let deposit_contract_address = self.config().deposit_contract_address.clone();
let blocks_per_log_query = self.config().blocks_per_log_query;
@@ -961,13 +760,10 @@ impl Service {
let range = {
match new_block_numbers {
Some(range) => range,
None => endpoints
.first_success(|e| async move {
relevant_new_block_numbers_from_endpoint(e, self, HeadType::Deposit).await
})
.await
.map(|(res, _)| res)
.map_err(Error::FallbackError)?,
None => {
relevant_new_block_numbers_from_endpoint(client, self, HeadType::Deposit)
.await?
}
}
};
@@ -1001,20 +797,14 @@ impl Service {
* Step 1. Download logs.
*/
let block_range_ref = &block_range;
let logs = endpoints
.first_success(|endpoint| async move {
endpoint
.get_deposit_logs_in_range(
deposit_contract_address_ref,
block_range_ref.clone(),
Duration::from_millis(GET_DEPOSIT_LOG_TIMEOUT_MILLIS),
)
.await
.map_err(SingleEndpointError::GetDepositLogsFailed)
})
let logs = client
.get_deposit_logs_in_range(
deposit_contract_address_ref,
block_range_ref.clone(),
Duration::from_millis(GET_DEPOSIT_LOG_TIMEOUT_MILLIS),
)
.await
.map(|(res, _)| res)
.map_err(Error::FallbackError)?;
.map_err(Error::GetDepositLogsFailed)?;
/*
* Step 2. Import logs to cache.
@@ -1050,7 +840,7 @@ impl Service {
logs_imported += 1;
}
Ok(())
Ok::<_, Error>(())
})?;
debug!(
@@ -1105,8 +895,8 @@ impl Service {
pub async fn update_block_cache(
&self,
new_block_numbers: Option<Option<RangeInclusive<u64>>>,
endpoints: &EndpointsCache,
) -> Result<BlockCacheUpdateOutcome, Error> {
let client = self.client();
let block_cache_truncation = self.config().block_cache_truncation;
let max_blocks_per_update = self
.config()
@@ -1116,14 +906,10 @@ impl Service {
let range = {
match new_block_numbers {
Some(range) => range,
None => endpoints
.first_success(|e| async move {
relevant_new_block_numbers_from_endpoint(e, self, HeadType::BlockCache)
.await
})
.await
.map(|(res, _)| res)
.map_err(Error::FallbackError)?,
None => {
relevant_new_block_numbers_from_endpoint(client, self, HeadType::BlockCache)
.await?
}
}
};
@@ -1183,13 +969,8 @@ impl Service {
let mut blocks_imported = 0;
for block_number in required_block_numbers {
let eth1_block = endpoints
.first_success(|e| async move {
download_eth1_block(e, self.inner.clone(), Some(block_number)).await
})
.await
.map(|(res, _)| res)
.map_err(Error::FallbackError)?;
let eth1_block =
download_eth1_block(client, self.inner.clone(), Some(block_number)).await?;
self.inner
.block_cache
@@ -1269,7 +1050,7 @@ fn relevant_block_range(
cache_follow_distance: u64,
latest_cached_block: Option<&Eth1Block>,
spec: &ChainSpec,
) -> Result<Option<RangeInclusive<u64>>, SingleEndpointError> {
) -> Result<Option<RangeInclusive<u64>>, Error> {
// If the latest cached block is lagging the head block by more than `cache_follow_distance`
// times the expected block time then the eth1 block time is likely quite different from what we
// assumed.
@@ -1304,7 +1085,7 @@ fn relevant_block_range(
//
// We assume that the `cache_follow_distance` should be sufficient to ensure this never
// happens, otherwise it is an error.
Err(SingleEndpointError::RemoteNotSynced {
Err(Error::RemoteNotSynced {
next_required_block,
remote_highest_block: remote_highest_block_number,
cache_follow_distance,
@@ -1325,7 +1106,7 @@ async fn download_eth1_block(
endpoint: &HttpJsonRpc,
cache: Arc<Inner>,
block_number_opt: Option<u64>,
) -> Result<Eth1Block, SingleEndpointError> {
) -> Result<Eth1Block, Error> {
let deposit_root = block_number_opt.and_then(|block_number| {
cache
.deposit_cache
@@ -1350,7 +1131,7 @@ async fn download_eth1_block(
.unwrap_or_else(|| BlockQuery::Latest),
Duration::from_millis(GET_BLOCK_TIMEOUT_MILLIS),
)
.map_err(SingleEndpointError::BlockDownloadFailed)
.map_err(Error::BlockDownloadFailed)
.await?;
Ok(Eth1Block {

View File

@@ -117,10 +117,9 @@ mod eth1_cache {
let initial_block_number = get_block_number(&web3).await;
let config = Config {
endpoints: Eth1Endpoint::NoAuth(vec![SensitiveUrl::parse(
eth1.endpoint().as_str(),
)
.unwrap()]),
endpoint: Eth1Endpoint::NoAuth(
SensitiveUrl::parse(eth1.endpoint().as_str()).unwrap(),
),
deposit_contract_address: deposit_contract.address(),
lowest_cached_block_number: initial_block_number,
follow_distance,
@@ -128,7 +127,8 @@ mod eth1_cache {
};
let cache_follow_distance = config.cache_follow_distance();
let service = Service::new(config, log.clone(), MainnetEthSpec::default_spec());
let service =
Service::new(config, log.clone(), MainnetEthSpec::default_spec()).unwrap();
// Create some blocks and then consume them, performing the test `rounds` times.
for round in 0..2 {
@@ -149,19 +149,17 @@ mod eth1_cache {
eth1.ganache.evm_mine().await.expect("should mine block");
}
let endpoints = service.init_endpoints().unwrap();
service
.update_deposit_cache(None, &endpoints)
.update_deposit_cache(None)
.await
.expect("should update deposit cache");
service
.update_block_cache(None, &endpoints)
.update_block_cache(None)
.await
.expect("should update block cache");
service
.update_block_cache(None, &endpoints)
.update_block_cache(None)
.await
.expect("should update cache when nothing has changed");
@@ -201,10 +199,9 @@ mod eth1_cache {
let service = Service::new(
Config {
endpoints: Eth1Endpoint::NoAuth(vec![SensitiveUrl::parse(
eth1.endpoint().as_str(),
)
.unwrap()]),
endpoint: Eth1Endpoint::NoAuth(
SensitiveUrl::parse(eth1.endpoint().as_str()).unwrap(),
),
deposit_contract_address: deposit_contract.address(),
lowest_cached_block_number: get_block_number(&web3).await,
follow_distance: 0,
@@ -213,7 +210,8 @@ mod eth1_cache {
},
log,
MainnetEthSpec::default_spec(),
);
)
.unwrap();
let blocks = cache_len * 2;
@@ -221,14 +219,12 @@ mod eth1_cache {
eth1.ganache.evm_mine().await.expect("should mine block")
}
let endpoints = service.init_endpoints().unwrap();
service
.update_deposit_cache(None, &endpoints)
.update_deposit_cache(None)
.await
.expect("should update deposit cache");
service
.update_block_cache(None, &endpoints)
.update_block_cache(None)
.await
.expect("should update block cache");
@@ -258,10 +254,9 @@ mod eth1_cache {
let service = Service::new(
Config {
endpoints: Eth1Endpoint::NoAuth(vec![SensitiveUrl::parse(
eth1.endpoint().as_str(),
)
.unwrap()]),
endpoint: Eth1Endpoint::NoAuth(
SensitiveUrl::parse(eth1.endpoint().as_str()).unwrap(),
),
deposit_contract_address: deposit_contract.address(),
lowest_cached_block_number: get_block_number(&web3).await,
follow_distance: 0,
@@ -270,19 +265,19 @@ mod eth1_cache {
},
log,
MainnetEthSpec::default_spec(),
);
)
.unwrap();
for _ in 0..4u8 {
for _ in 0..cache_len / 2 {
eth1.ganache.evm_mine().await.expect("should mine block")
}
let endpoints = service.init_endpoints().unwrap();
service
.update_deposit_cache(None, &endpoints)
.update_deposit_cache(None)
.await
.expect("should update deposit cache");
service
.update_block_cache(None, &endpoints)
.update_block_cache(None)
.await
.expect("should update block cache");
}
@@ -311,10 +306,9 @@ mod eth1_cache {
let service = Service::new(
Config {
endpoints: Eth1Endpoint::NoAuth(vec![SensitiveUrl::parse(
eth1.endpoint().as_str(),
)
.unwrap()]),
endpoint: Eth1Endpoint::NoAuth(
SensitiveUrl::parse(eth1.endpoint().as_str()).unwrap(),
),
deposit_contract_address: deposit_contract.address(),
lowest_cached_block_number: get_block_number(&web3).await,
follow_distance: 0,
@@ -322,21 +316,21 @@ mod eth1_cache {
},
log,
MainnetEthSpec::default_spec(),
);
)
.unwrap();
for _ in 0..n {
eth1.ganache.evm_mine().await.expect("should mine block")
}
let endpoints = service.init_endpoints().unwrap();
futures::try_join!(
service.update_deposit_cache(None, &endpoints),
service.update_deposit_cache(None, &endpoints)
service.update_deposit_cache(None),
service.update_deposit_cache(None)
)
.expect("should perform two simultaneous updates of deposit cache");
futures::try_join!(
service.update_block_cache(None, &endpoints),
service.update_block_cache(None, &endpoints)
service.update_block_cache(None),
service.update_block_cache(None)
)
.expect("should perform two simultaneous updates of block cache");
@@ -366,10 +360,9 @@ mod deposit_tree {
let service = Service::new(
Config {
endpoints: Eth1Endpoint::NoAuth(vec![SensitiveUrl::parse(
eth1.endpoint().as_str(),
)
.unwrap()]),
endpoint: Eth1Endpoint::NoAuth(
SensitiveUrl::parse(eth1.endpoint().as_str()).unwrap(),
),
deposit_contract_address: deposit_contract.address(),
deposit_contract_deploy_block: start_block,
follow_distance: 0,
@@ -377,7 +370,8 @@ mod deposit_tree {
},
log,
MainnetEthSpec::default_spec(),
);
)
.unwrap();
for round in 0..3 {
let deposits: Vec<_> = (0..n).map(|_| random_deposit_data()).collect();
@@ -389,15 +383,13 @@ mod deposit_tree {
.expect("should perform a deposit");
}
let endpoints = service.init_endpoints().unwrap();
service
.update_deposit_cache(None, &endpoints)
.update_deposit_cache(None)
.await
.expect("should perform update");
service
.update_deposit_cache(None, &endpoints)
.update_deposit_cache(None)
.await
.expect("should perform update when nothing has changed");
@@ -449,10 +441,9 @@ mod deposit_tree {
let service = Service::new(
Config {
endpoints: Eth1Endpoint::NoAuth(vec![SensitiveUrl::parse(
eth1.endpoint().as_str(),
)
.unwrap()]),
endpoint: Eth1Endpoint::NoAuth(
SensitiveUrl::parse(eth1.endpoint().as_str()).unwrap(),
),
deposit_contract_address: deposit_contract.address(),
deposit_contract_deploy_block: start_block,
lowest_cached_block_number: start_block,
@@ -461,7 +452,8 @@ mod deposit_tree {
},
log,
MainnetEthSpec::default_spec(),
);
)
.unwrap();
let deposits: Vec<_> = (0..n).map(|_| random_deposit_data()).collect();
@@ -472,10 +464,9 @@ mod deposit_tree {
.expect("should perform a deposit");
}
let endpoints = service.init_endpoints().unwrap();
futures::try_join!(
service.update_deposit_cache(None, &endpoints),
service.update_deposit_cache(None, &endpoints)
service.update_deposit_cache(None),
service.update_deposit_cache(None)
)
.expect("should perform two updates concurrently");
@@ -706,10 +697,9 @@ mod fast {
let now = get_block_number(&web3).await;
let service = Service::new(
Config {
endpoints: Eth1Endpoint::NoAuth(vec![SensitiveUrl::parse(
eth1.endpoint().as_str(),
)
.unwrap()]),
endpoint: Eth1Endpoint::NoAuth(
SensitiveUrl::parse(eth1.endpoint().as_str()).unwrap(),
),
deposit_contract_address: deposit_contract.address(),
deposit_contract_deploy_block: now,
lowest_cached_block_number: now,
@@ -719,7 +709,8 @@ mod fast {
},
log,
MainnetEthSpec::default_spec(),
);
)
.unwrap();
let client = HttpJsonRpc::new(SensitiveUrl::parse(&eth1.endpoint()).unwrap()).unwrap();
let n = 10;
let deposits: Vec<_> = (0..n).map(|_| random_deposit_data()).collect();
@@ -732,9 +723,8 @@ mod fast {
eth1.ganache.evm_mine().await.expect("should mine block");
}
let endpoints = service.init_endpoints().unwrap();
service
.update_deposit_cache(None, &endpoints)
.update_deposit_cache(None)
.await
.expect("should perform update");
@@ -787,10 +777,9 @@ mod persist {
let now = get_block_number(&web3).await;
let config = Config {
endpoints: Eth1Endpoint::NoAuth(vec![SensitiveUrl::parse(
eth1.endpoint().as_str(),
)
.unwrap()]),
endpoint: Eth1Endpoint::NoAuth(
SensitiveUrl::parse(eth1.endpoint().as_str()).unwrap(),
),
deposit_contract_address: deposit_contract.address(),
deposit_contract_deploy_block: now,
lowest_cached_block_number: now,
@@ -798,7 +787,8 @@ mod persist {
block_cache_truncation: None,
..Config::default()
};
let service = Service::new(config.clone(), log.clone(), MainnetEthSpec::default_spec());
let service =
Service::new(config.clone(), log.clone(), MainnetEthSpec::default_spec()).unwrap();
let n = 10;
let deposits: Vec<_> = (0..n).map(|_| random_deposit_data()).collect();
for deposit in &deposits {
@@ -808,9 +798,8 @@ mod persist {
.expect("should perform a deposit");
}
let endpoints = service.init_endpoints().unwrap();
service
.update_deposit_cache(None, &endpoints)
.update_deposit_cache(None)
.await
.expect("should perform update");
@@ -822,7 +811,7 @@ mod persist {
let deposit_count = service.deposit_cache_len();
service
.update_block_cache(None, &endpoints)
.update_block_cache(None)
.await
.expect("should perform update");
@@ -855,228 +844,3 @@ mod persist {
.await;
}
}
/// Tests for eth1 fallback
mod fallbacks {
use super::*;
use tokio::time::sleep;
#[tokio::test]
async fn test_fallback_when_offline() {
async {
let log = null_logger();
let endpoint2 = new_ganache_instance()
.await
.expect("should start eth1 environment");
let deposit_contract = &endpoint2.deposit_contract;
let initial_block_number = get_block_number(&endpoint2.web3()).await;
// Create some blocks and then consume them, performing the test `rounds` times.
let new_blocks = 4;
for _ in 0..new_blocks {
endpoint2
.ganache
.evm_mine()
.await
.expect("should mine block");
}
let endpoint1 = endpoint2
.ganache
.fork()
.expect("should start eth1 environment");
//mine additional blocks on top of the original endpoint
for _ in 0..new_blocks {
endpoint2
.ganache
.evm_mine()
.await
.expect("should mine block");
}
let service = Service::new(
Config {
endpoints: Eth1Endpoint::NoAuth(vec![
SensitiveUrl::parse(endpoint1.endpoint().as_str()).unwrap(),
SensitiveUrl::parse(endpoint2.endpoint().as_str()).unwrap(),
]),
deposit_contract_address: deposit_contract.address(),
lowest_cached_block_number: initial_block_number,
follow_distance: 0,
..Config::default()
},
log.clone(),
MainnetEthSpec::default_spec(),
);
let endpoint1_block_number = get_block_number(&endpoint1.web3).await;
//the first call will only query endpoint1
service.update().await.expect("should update deposit cache");
assert_eq!(
service.deposits().read().last_processed_block.unwrap(),
endpoint1_block_number
);
drop(endpoint1);
let endpoint2_block_number = get_block_number(&endpoint2.web3()).await;
assert!(endpoint1_block_number < endpoint2_block_number);
//endpoint1 is offline => query will import blocks from endpoint2
service.update().await.expect("should update deposit cache");
assert_eq!(
service.deposits().read().last_processed_block.unwrap(),
endpoint2_block_number
);
}
.await;
}
#[tokio::test]
async fn test_fallback_when_wrong_chain_id() {
async {
let log = null_logger();
let correct_chain_id: u64 = DEFAULT_CHAIN_ID.into();
let wrong_chain_id = correct_chain_id + 1;
let endpoint1 = GanacheEth1Instance::new(wrong_chain_id)
.await
.expect("should start eth1 environment");
let endpoint2 = new_ganache_instance()
.await
.expect("should start eth1 environment");
let deposit_contract = &endpoint2.deposit_contract;
let initial_block_number = get_block_number(&endpoint2.web3()).await;
// Create some blocks and then consume them, performing the test `rounds` times.
let new_blocks = 4;
for _ in 0..new_blocks {
endpoint1
.ganache
.evm_mine()
.await
.expect("should mine block");
endpoint2
.ganache
.evm_mine()
.await
.expect("should mine block");
}
//additional blocks for endpoint1 to be able to distinguish
for _ in 0..new_blocks {
endpoint1
.ganache
.evm_mine()
.await
.expect("should mine block");
}
let service = Service::new(
Config {
endpoints: Eth1Endpoint::NoAuth(vec![
SensitiveUrl::parse(endpoint2.endpoint().as_str()).unwrap(),
SensitiveUrl::parse(endpoint1.endpoint().as_str()).unwrap(),
]),
deposit_contract_address: deposit_contract.address(),
lowest_cached_block_number: initial_block_number,
follow_distance: 0,
..Config::default()
},
log.clone(),
MainnetEthSpec::default_spec(),
);
let endpoint1_block_number = get_block_number(&endpoint1.web3()).await;
let endpoint2_block_number = get_block_number(&endpoint2.web3()).await;
assert!(endpoint2_block_number < endpoint1_block_number);
//the call will fallback to endpoint2
service.update().await.expect("should update deposit cache");
assert_eq!(
service.deposits().read().last_processed_block.unwrap(),
endpoint2_block_number
);
}
.await;
}
#[tokio::test]
async fn test_fallback_when_node_far_behind() {
async {
let log = null_logger();
let endpoint2 = new_ganache_instance()
.await
.expect("should start eth1 environment");
let deposit_contract = &endpoint2.deposit_contract;
let initial_block_number = get_block_number(&endpoint2.web3()).await;
// Create some blocks and then consume them, performing the test `rounds` times.
let new_blocks = 4;
for _ in 0..new_blocks {
endpoint2
.ganache
.evm_mine()
.await
.expect("should mine block");
}
let endpoint1 = endpoint2
.ganache
.fork()
.expect("should start eth1 environment");
let service = Service::new(
Config {
endpoints: Eth1Endpoint::NoAuth(vec![
SensitiveUrl::parse(endpoint1.endpoint().as_str()).unwrap(),
SensitiveUrl::parse(endpoint2.endpoint().as_str()).unwrap(),
]),
deposit_contract_address: deposit_contract.address(),
lowest_cached_block_number: initial_block_number,
follow_distance: 0,
node_far_behind_seconds: 5,
..Config::default()
},
log.clone(),
MainnetEthSpec::default_spec(),
);
let endpoint1_block_number = get_block_number(&endpoint1.web3).await;
//the first call will only query endpoint1
service.update().await.expect("should update deposit cache");
assert_eq!(
service.deposits().read().last_processed_block.unwrap(),
endpoint1_block_number
);
sleep(Duration::from_secs(7)).await;
//both endpoints don't have recent blocks => should return error
assert!(service.update().await.is_err());
//produce some new blocks on endpoint2
for _ in 0..new_blocks {
endpoint2
.ganache
.evm_mine()
.await
.expect("should mine block");
}
let endpoint2_block_number = get_block_number(&endpoint2.web3()).await;
//endpoint1 is far behind + endpoint2 not => update will import blocks from endpoint2
service.update().await.expect("should update deposit cache");
assert_eq!(
service.deposits().read().last_processed_block.unwrap(),
endpoint2_block_number
);
}
.await;
}
}