Improve eth1 block sync (#2008)

## Issue Addressed

NA

## Proposed Changes

- Log about eth1 whilst waiting for genesis.
- For the block and deposit caches, update them after each download instead of when *all* downloads are complete.
  - This prevents the case where a single timeout error can cause us to drop *all* previously download blocks/deposits.
- Set `max_log_requests_per_update` to avoid timeouts due to very large log counts in a response.
- Set `max_blocks_per_update` to prevent a single update of the block cache to download an unreasonable number of blocks.
  - This shouldn't have any affect in normal use, it's just a safe-guard against bugs.
- Increase the timeout for eth1 calls from 15s to 60s, as per @pawanjay176's experience with Infura.

## Additional Info

NA
This commit is contained in:
Paul Hauner
2020-11-30 20:29:17 +00:00
parent 8fcd22992c
commit 77f3539654
5 changed files with 176 additions and 105 deletions

View File

@@ -71,23 +71,52 @@ fn get_sync_status<T: EthSpec>(
latest_cached_block: Option<&Eth1Block>,
head_block: Option<&Eth1Block>,
genesis_time: u64,
current_slot: Slot,
current_slot: Option<Slot>,
spec: &ChainSpec,
) -> Option<Eth1SyncStatusData> {
let period = T::SlotsPerEth1VotingPeriod::to_u64();
// Since `period` is a "constant", we assume it is set sensibly.
let voting_period_start_slot = (current_slot / period) * period;
let voting_target_timestamp = {
let eth1_follow_distance_seconds = spec
.seconds_per_eth1_block
.saturating_mul(spec.eth1_follow_distance);
// The voting target timestamp needs to be special-cased when we're before
// genesis (as defined by `current_slot == None`).
//
// For the sake of this status, when prior to genesis we want to invent some voting periods
// that are *before* genesis, so that we can indicate to users that we're actually adequately
// cached for where they are in time.
let voting_target_timestamp = if let Some(current_slot) = current_slot {
let period = T::SlotsPerEth1VotingPeriod::to_u64();
let voting_period_start_slot = (current_slot / period) * period;
let period_start = slot_start_seconds::<T>(
genesis_time,
spec.milliseconds_per_slot,
voting_period_start_slot,
);
let eth1_follow_distance_seconds = spec
.seconds_per_eth1_block
.saturating_mul(spec.eth1_follow_distance);
period_start.saturating_sub(eth1_follow_distance_seconds)
} else {
// The number of seconds in an eth1 voting period.
let voting_period_duration =
T::slots_per_eth1_voting_period() as u64 * (spec.milliseconds_per_slot / 1_000);
let now = SystemTime::now().duration_since(UNIX_EPOCH).ok()?.as_secs();
// The number of seconds between now and genesis.
let seconds_till_genesis = genesis_time.saturating_sub(now);
// Determine how many voting periods are contained in distance between
// now and genesis, rounding up.
let voting_periods_past =
(seconds_till_genesis + voting_period_duration - 1) / voting_period_duration;
// Return the start time of the current voting period*.
//
// *: This voting period doesn't *actually* exist, we're just using it to
// give useful logs prior to genesis.
genesis_time
.saturating_sub(voting_periods_past * voting_period_duration)
.saturating_sub(eth1_follow_distance_seconds)
};
let latest_cached_block_number = latest_cached_block.map(|b| b.number);
@@ -232,7 +261,7 @@ where
pub fn sync_status(
&self,
genesis_time: u64,
current_slot: Slot,
current_slot: Option<Slot>,
spec: &ChainSpec,
) -> Option<Eth1SyncStatusData> {
get_sync_status::<E>(

View File

@@ -3,7 +3,7 @@ use beacon_chain::{BeaconChain, BeaconChainTypes};
use eth2_libp2p::NetworkGlobals;
use futures::prelude::*;
use parking_lot::Mutex;
use slog::{debug, error, info, warn};
use slog::{debug, error, info, warn, Logger};
use slot_clock::SlotClock;
use std::sync::Arc;
use std::time::{Duration, Instant};
@@ -56,6 +56,7 @@ pub fn spawn_notifier<T: BeaconChainTypes>(
"peers" => peer_count_pretty(network.connected_peers()),
"wait_time" => estimated_time_pretty(Some(next_slot.as_secs() as f64)),
);
eth1_logging(&beacon_chain, &log);
sleep(slot_duration).await;
}
_ => break,
@@ -172,37 +173,7 @@ pub fn spawn_notifier<T: BeaconChainTypes>(
);
}
// Perform some logging about the eth1 chain
if let Some(eth1_chain) = beacon_chain.eth1_chain.as_ref() {
if let Some(status) =
eth1_chain.sync_status(head_info.genesis_time, current_slot, &beacon_chain.spec)
{
debug!(
log,
"Eth1 cache sync status";
"eth1_head_block" => status.head_block_number,
"latest_cached_block_number" => status.latest_cached_block_number,
"latest_cached_timestamp" => status.latest_cached_block_timestamp,
"voting_target_timestamp" => status.voting_target_timestamp,
"ready" => status.lighthouse_is_cached_and_ready
);
if !status.lighthouse_is_cached_and_ready {
warn!(
log,
"Syncing eth1 block cache";
"target_timestamp" => status.voting_target_timestamp,
"latest_timestamp" => status.latest_cached_block_timestamp,
"msg" => "block production temporarily impaired"
);
}
} else {
error!(
log,
"Unable to determine eth1 sync status";
);
}
}
eth1_logging(&beacon_chain, &log);
}
Ok::<(), ()>(())
};
@@ -213,6 +184,59 @@ pub fn spawn_notifier<T: BeaconChainTypes>(
Ok(())
}
fn eth1_logging<T: BeaconChainTypes>(beacon_chain: &BeaconChain<T>, log: &Logger) {
let current_slot_opt = beacon_chain.slot().ok();
if let Ok(head_info) = beacon_chain.head_info() {
// Perform some logging about the eth1 chain
if let Some(eth1_chain) = beacon_chain.eth1_chain.as_ref() {
if let Some(status) =
eth1_chain.sync_status(head_info.genesis_time, current_slot_opt, &beacon_chain.spec)
{
debug!(
log,
"Eth1 cache sync status";
"eth1_head_block" => status.head_block_number,
"latest_cached_block_number" => status.latest_cached_block_number,
"latest_cached_timestamp" => status.latest_cached_block_timestamp,
"voting_target_timestamp" => status.voting_target_timestamp,
"ready" => status.lighthouse_is_cached_and_ready
);
if !status.lighthouse_is_cached_and_ready {
let voting_target_timestamp = status.voting_target_timestamp;
let distance = status
.latest_cached_block_timestamp
.map(|latest| {
voting_target_timestamp.saturating_sub(latest)
/ beacon_chain.spec.seconds_per_eth1_block
})
.map(|distance| distance.to_string())
.unwrap_or_else(|| "initializing deposits".to_string());
warn!(
log,
"Syncing eth1 block cache";
"msg" => "sync can take longer when using remote eth1 nodes",
"est_blocks_remaining" => distance,
);
}
} else {
error!(
log,
"Unable to determine eth1 sync status";
);
}
}
} else {
error!(
log,
"Unable to get head info";
);
}
}
/// Returns the peer count, returning something helpful if it's `usize::max_value` (effectively a
/// `None` value).
fn peer_count_pretty(peer_count: usize) -> String {

View File

@@ -4,12 +4,12 @@ use crate::{
deposit_cache::Error as DepositCacheError,
http::{
get_block, get_block_number, get_chain_id, get_deposit_logs_in_range, get_network_id,
BlockQuery, Eth1Id, Log,
BlockQuery, Eth1Id,
},
inner::{DepositUpdater, Inner},
};
use fallback::{Fallback, FallbackError};
use futures::{future::TryFutureExt, stream, stream::TryStreamExt, StreamExt};
use futures::{future::TryFutureExt, StreamExt};
use parking_lot::{RwLock, RwLockReadGuard};
use serde::{Deserialize, Serialize};
use slog::{crit, debug, error, info, trace, warn, Logger};
@@ -34,7 +34,7 @@ const BLOCK_NUMBER_TIMEOUT_MILLIS: u64 = STANDARD_TIMEOUT_MILLIS;
/// Timeout when doing an eth_getBlockByNumber call.
const GET_BLOCK_TIMEOUT_MILLIS: u64 = STANDARD_TIMEOUT_MILLIS;
/// Timeout when doing an eth_getLogs to read the deposit contract logs.
const GET_DEPOSIT_LOG_TIMEOUT_MILLIS: u64 = STANDARD_TIMEOUT_MILLIS;
const GET_DEPOSIT_LOG_TIMEOUT_MILLIS: u64 = 60_000;
const WARNING_MSG: &str = "BLOCK PROPOSALS WILL FAIL WITHOUT VALID, SYNCED ETH1 CONNECTION";
@@ -384,8 +384,8 @@ impl Default for Config {
block_cache_truncation: Some(4_096),
auto_update_interval_millis: 7_000,
blocks_per_log_query: 1_000,
max_log_requests_per_update: None,
max_blocks_per_update: None,
max_log_requests_per_update: Some(100),
max_blocks_per_update: Some(8_192),
}
}
}
@@ -817,38 +817,40 @@ impl Service {
Vec::new()
};
let mut logs_imported: usize = 0;
let deposit_contract_address_ref: &str = &deposit_contract_address;
let logs: Vec<(Range<u64>, Vec<Log>)> =
stream::try_unfold(block_number_chunks.into_iter(), |mut chunks| async {
match chunks.next() {
Some(chunk) => {
let chunk_ref = &chunk;
endpoints
.first_success(|e| async move {
get_deposit_logs_in_range(
e,
deposit_contract_address_ref,
chunk_ref.clone(),
Duration::from_millis(GET_DEPOSIT_LOG_TIMEOUT_MILLIS),
)
.await
.map_err(SingleEndpointError::GetDepositLogsFailed)
})
.await
.map(|logs| Some(((chunk, logs), chunks)))
}
None => Ok(None),
}
})
.try_collect()
.await
.map_err(Error::FallbackError)?;
for block_range in block_number_chunks.into_iter() {
if block_range.is_empty() {
debug!(
self.log,
"No new blocks to scan for logs";
);
continue;
}
let mut logs_imported = 0;
for (block_range, log_chunk) in logs.iter() {
/*
* Step 1. Download logs.
*/
let block_range_ref = &block_range;
let logs = endpoints
.first_success(|e| async move {
get_deposit_logs_in_range(
e,
&deposit_contract_address_ref,
block_range_ref.clone(),
Duration::from_millis(GET_DEPOSIT_LOG_TIMEOUT_MILLIS),
)
.await
.map_err(SingleEndpointError::GetDepositLogsFailed)
})
.await
.map_err(Error::FallbackError)?;
/*
* Step 2. Import logs to cache.
*/
let mut cache = self.deposits().write();
log_chunk
.iter()
logs.iter()
.map(|raw_log| {
raw_log.to_deposit_log(self.inner.spec()).map_err(|error| {
Error::FailedToParseDepositLog {
@@ -881,6 +883,12 @@ impl Service {
// node to choose an invalid genesis state or propose an invalid block.
.collect::<Result<_, _>>()?;
debug!(
self.log,
"Imported deposit logs chunk";
"logs" => logs.len(),
);
cache.last_processed_block = Some(block_range.end.saturating_sub(1));
metrics::set_gauge(&metrics::DEPOSIT_CACHE_LEN, cache.cache.len() as i64);
@@ -976,8 +984,9 @@ impl Service {
} else {
Vec::new()
};
// Download the range of blocks and sequentially import them into the cache.
// Last processed block in deposit cache
// This value is used to prevent the block cache from importing a block that is not yet in
// the deposit cache.
let latest_in_cache = self
.inner
.deposit_cache
@@ -990,34 +999,26 @@ impl Service {
.filter(|x| *x <= latest_in_cache)
.take(max_blocks_per_update)
.collect::<Vec<_>>();
debug!(
self.log,
"Downloading eth1 blocks";
"first" => ?required_block_numbers.first(),
"last" => ?required_block_numbers.last(),
);
// Produce a stream from the list of required block numbers and return a future that
// consumes the it.
let eth1_blocks: Vec<Eth1Block> = stream::try_unfold(
required_block_numbers.into_iter(),
|mut block_numbers| async {
match block_numbers.next() {
Some(block_number) => {
match endpoints
.first_success(|e| async move {
download_eth1_block(e, self.inner.clone(), Some(block_number)).await
})
.await
{
Ok(eth1_block) => Ok(Some((eth1_block, block_numbers))),
Err(e) => Err(e),
}
}
None => Ok(None),
}
},
)
.try_collect()
.await
.map_err(Error::FallbackError)?;
let mut blocks_imported = 0;
for eth1_block in eth1_blocks {
for block_number in required_block_numbers {
let eth1_block = endpoints
.first_success(|e| async move {
download_eth1_block(e, self.inner.clone(), Some(block_number)).await
})
.await
.map_err(Error::FallbackError)?;
self.inner
.block_cache
.write()

View File

@@ -2075,9 +2075,7 @@ pub fn serve<T: BeaconChainTypes>(
let head_info = chain
.head_info()
.map_err(warp_utils::reject::beacon_chain_error)?;
let current_slot = chain
.slot()
.map_err(warp_utils::reject::beacon_chain_error)?;
let current_slot_opt = chain.slot().ok();
chain
.eth1_chain
@@ -2088,7 +2086,7 @@ pub fn serve<T: BeaconChainTypes>(
)
})
.and_then(|eth1| {
eth1.sync_status(head_info.genesis_time, current_slot, &chain.spec)
eth1.sync_status(head_info.genesis_time, current_slot_opt, &chain.spec)
.ok_or_else(|| {
warp_utils::reject::custom_server_error(
"Unable to determine Eth1 sync status".to_string(),