Deposit Cache Finalization & Fast WS Sync (#2915)

## Summary

The deposit cache now has the ability to finalize deposits. This will cause it to drop unneeded deposit logs and hashes in the deposit Merkle tree that are no longer required to construct deposit proofs. The cache is finalized whenever the latest finalized checkpoint has a new `Eth1Data` with all deposits imported.

This has three benefits:

1. Improves the speed of constructing Merkle proofs for deposits as we can just replay deposits since the last finalized checkpoint instead of all historical deposits when re-constructing the Merkle tree.
2. Significantly faster weak subjectivity sync as the deposit cache can be transferred to the newly syncing node in compressed form. The Merkle tree that stores `N` finalized deposits requires a maximum of `log2(N)` hashes. The newly syncing node then only needs to download deposits since the last finalized checkpoint to have a full tree.
3. Future proofing in preparation for [EIP-4444](https://eips.ethereum.org/EIPS/eip-4444) as execution nodes will no longer be required to store logs permanently so we won't always have all historical logs available to us.

## More Details

Image to illustrate how the deposit contract merkle tree evolves and finalizes along with the resulting `DepositTreeSnapshot`
![image](https://user-images.githubusercontent.com/37123614/151465302-5fc56284-8a69-4998-b20e-45db3934ac70.png)

## Other Considerations

I've changed the structure of the `SszDepositCache` so once you load & save your database from this version of lighthouse, you will no longer be able to load it from older versions.

Co-authored-by: ethDreamer <37123614+ethDreamer@users.noreply.github.com>
This commit is contained in:
ethDreamer
2022-10-30 04:04:24 +00:00
parent 46fbf5b98b
commit e8604757a2
35 changed files with 2302 additions and 171 deletions

View File

@@ -1,7 +1,10 @@
use ssz_derive::{Decode, Encode};
use std::collections::HashMap;
use std::ops::RangeInclusive;
pub use eth2::lighthouse::Eth1Block;
use eth2::types::Hash256;
use std::sync::Arc;
#[derive(Debug, PartialEq, Clone)]
pub enum Error {
@@ -20,7 +23,9 @@ pub enum Error {
/// timestamp.
#[derive(Debug, PartialEq, Clone, Default, Encode, Decode)]
pub struct BlockCache {
blocks: Vec<Eth1Block>,
blocks: Vec<Arc<Eth1Block>>,
#[ssz(skip_serializing, skip_deserializing)]
by_hash: HashMap<Hash256, Arc<Eth1Block>>,
}
impl BlockCache {
@@ -36,12 +41,12 @@ impl BlockCache {
/// Returns the earliest (lowest timestamp) block, if any.
pub fn earliest_block(&self) -> Option<&Eth1Block> {
self.blocks.first()
self.blocks.first().map(|ptr| ptr.as_ref())
}
/// Returns the latest (highest timestamp) block, if any.
pub fn latest_block(&self) -> Option<&Eth1Block> {
self.blocks.last()
self.blocks.last().map(|ptr| ptr.as_ref())
}
/// Returns the timestamp of the earliest block in the cache (if any).
@@ -71,7 +76,7 @@ impl BlockCache {
/// - Monotonically increasing block numbers.
/// - Non-uniformly increasing block timestamps.
pub fn iter(&self) -> impl DoubleEndedIterator<Item = &Eth1Block> + Clone {
self.blocks.iter()
self.blocks.iter().map(|ptr| ptr.as_ref())
}
/// Shortens the cache, keeping the latest (by block number) `len` blocks while dropping the
@@ -80,7 +85,11 @@ impl BlockCache {
/// If `len` is greater than the vector's current length, this has no effect.
pub fn truncate(&mut self, len: usize) {
if len < self.blocks.len() {
self.blocks = self.blocks.split_off(self.blocks.len() - len);
let remaining = self.blocks.split_off(self.blocks.len() - len);
for block in &self.blocks {
self.by_hash.remove(&block.hash);
}
self.blocks = remaining;
}
}
@@ -92,12 +101,27 @@ impl BlockCache {
/// Returns a block with the corresponding number, if any.
pub fn block_by_number(&self, block_number: u64) -> Option<&Eth1Block> {
self.blocks.get(
self.blocks
.as_slice()
.binary_search_by(|block| block.number.cmp(&block_number))
.ok()?,
)
self.blocks
.get(
self.blocks
.as_slice()
.binary_search_by(|block| block.number.cmp(&block_number))
.ok()?,
)
.map(|ptr| ptr.as_ref())
}
/// Returns a block with the corresponding hash, if any.
pub fn block_by_hash(&self, block_hash: &Hash256) -> Option<&Eth1Block> {
self.by_hash.get(block_hash).map(|ptr| ptr.as_ref())
}
/// Rebuilds the by_hash map
pub fn rebuild_by_hash_map(&mut self) {
self.by_hash.clear();
for block in self.blocks.iter() {
self.by_hash.insert(block.hash, block.clone());
}
}
/// Insert an `Eth1Snapshot` into `self`, allowing future queries.
@@ -161,7 +185,9 @@ impl BlockCache {
}
}
self.blocks.push(block);
let ptr = Arc::new(block);
self.by_hash.insert(ptr.hash, ptr.clone());
self.blocks.push(ptr);
Ok(())
}
@@ -269,6 +295,8 @@ mod tests {
.expect("should add consecutive blocks with duplicate timestamps");
}
let blocks = blocks.into_iter().map(Arc::new).collect::<Vec<_>>();
assert_eq!(cache.blocks, blocks, "should have added all blocks");
}
}

File diff suppressed because it is too large Load Diff

View File

@@ -2,14 +2,15 @@ use crate::service::endpoint_from_config;
use crate::Config;
use crate::{
block_cache::{BlockCache, Eth1Block},
deposit_cache::{DepositCache, SszDepositCache},
deposit_cache::{DepositCache, SszDepositCache, SszDepositCacheV1, SszDepositCacheV13},
};
use execution_layer::HttpJsonRpc;
use parking_lot::RwLock;
use ssz::four_byte_option_impl;
use ssz::{Decode, Encode};
use ssz_derive::{Decode, Encode};
use types::ChainSpec;
use superstruct::superstruct;
use types::{ChainSpec, DepositTreeSnapshot, Eth1Data};
// Define "legacy" implementations of `Option<u64>` which use four bytes for encoding the union
// selector.
@@ -29,12 +30,25 @@ impl DepositUpdater {
last_processed_block: None,
}
}
pub fn from_snapshot(
deposit_contract_deploy_block: u64,
snapshot: &DepositTreeSnapshot,
) -> Result<Self, String> {
let last_processed_block = Some(snapshot.execution_block_height);
Ok(Self {
cache: DepositCache::from_deposit_snapshot(deposit_contract_deploy_block, snapshot)?,
last_processed_block,
})
}
}
pub struct Inner {
pub block_cache: RwLock<BlockCache>,
pub deposit_cache: RwLock<DepositUpdater>,
pub endpoint: HttpJsonRpc,
// this gets set to Some(Eth1Data) when the deposit finalization conditions are met
pub to_finalize: RwLock<Option<Eth1Data>>,
pub config: RwLock<Config>,
pub remote_head_block: RwLock<Option<Eth1Block>>,
pub spec: ChainSpec,
@@ -58,9 +72,13 @@ impl Inner {
/// Recover `Inner` given byte representation of eth1 deposit and block caches.
pub fn from_bytes(bytes: &[u8], config: Config, spec: ChainSpec) -> Result<Self, String> {
let ssz_cache = SszEth1Cache::from_ssz_bytes(bytes)
.map_err(|e| format!("Ssz decoding error: {:?}", e))?;
ssz_cache.to_inner(config, spec)
SszEth1Cache::from_ssz_bytes(bytes)
.map_err(|e| format!("Ssz decoding error: {:?}", e))?
.to_inner(config, spec)
.map(|inner| {
inner.block_cache.write().rebuild_by_hash_map();
inner
})
}
/// Returns a reference to the specification.
@@ -69,12 +87,21 @@ impl Inner {
}
}
#[derive(Encode, Decode, Clone)]
pub type SszEth1Cache = SszEth1CacheV13;
#[superstruct(
variants(V1, V13),
variant_attributes(derive(Encode, Decode, Clone)),
no_enum
)]
pub struct SszEth1Cache {
block_cache: BlockCache,
deposit_cache: SszDepositCache,
pub block_cache: BlockCache,
#[superstruct(only(V1))]
pub deposit_cache: SszDepositCacheV1,
#[superstruct(only(V13))]
pub deposit_cache: SszDepositCacheV13,
#[ssz(with = "four_byte_option_u64")]
last_processed_block: Option<u64>,
pub last_processed_block: Option<u64>,
}
impl SszEth1Cache {
@@ -97,6 +124,7 @@ impl SszEth1Cache {
}),
endpoint: endpoint_from_config(&config)
.map_err(|e| format!("Failed to create endpoint: {:?}", e))?,
to_finalize: RwLock::new(None),
// Set the remote head_block zero when creating a new instance. We only care about
// present and future eth1 nodes.
remote_head_block: RwLock::new(None),

View File

@@ -8,9 +8,9 @@ mod metrics;
mod service;
pub use block_cache::{BlockCache, Eth1Block};
pub use deposit_cache::DepositCache;
pub use deposit_cache::{DepositCache, SszDepositCache, SszDepositCacheV1, SszDepositCacheV13};
pub use execution_layer::http::deposit_log::DepositLog;
pub use inner::SszEth1Cache;
pub use inner::{SszEth1Cache, SszEth1CacheV1, SszEth1CacheV13};
pub use service::{
BlockCacheUpdateOutcome, Config, DepositCacheUpdateOutcome, Error, Eth1Endpoint, Service,
DEFAULT_CHAIN_ID,

View File

@@ -20,7 +20,7 @@ use std::path::PathBuf;
use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};
use tokio::time::{interval_at, Duration, Instant};
use types::{ChainSpec, EthSpec, Unsigned};
use types::{ChainSpec, DepositTreeSnapshot, Eth1Data, EthSpec, Unsigned};
/// Indicates the default eth1 chain id we use for the deposit contract.
pub const DEFAULT_CHAIN_ID: Eth1Id = Eth1Id::Goerli;
@@ -63,7 +63,13 @@ async fn endpoint_state(
config_chain_id: &Eth1Id,
log: &Logger,
) -> EndpointState {
let error_connecting = |e| {
let error_connecting = |e: String| {
debug!(
log,
"eth1 endpoint error";
"endpoint" => %endpoint,
"error" => &e,
);
warn!(
log,
"Error connecting to eth1 node endpoint";
@@ -213,6 +219,10 @@ pub enum Error {
GetDepositLogsFailed(String),
/// There was an unexpected internal error.
Internal(String),
/// Error finalizing deposit
FailedToFinalizeDeposit(String),
/// There was a problem Initializing from deposit snapshot
FailedToInitializeFromSnapshot(String),
}
/// The success message for an Eth1Data cache update.
@@ -395,6 +405,7 @@ impl Service {
config.deposit_contract_deploy_block,
)),
endpoint: endpoint_from_config(&config)?,
to_finalize: RwLock::new(None),
remote_head_block: RwLock::new(None),
config: RwLock::new(config),
spec,
@@ -407,6 +418,36 @@ impl Service {
&self.inner.endpoint
}
/// Creates a new service, initializing the deposit tree from a snapshot.
pub fn from_deposit_snapshot(
config: Config,
log: Logger,
spec: ChainSpec,
deposit_snapshot: &DepositTreeSnapshot,
) -> Result<Self, Error> {
let deposit_cache =
DepositUpdater::from_snapshot(config.deposit_contract_deploy_block, deposit_snapshot)
.map_err(Error::FailedToInitializeFromSnapshot)?;
Ok(Self {
inner: Arc::new(Inner {
block_cache: <_>::default(),
deposit_cache: RwLock::new(deposit_cache),
endpoint: endpoint_from_config(&config)
.map_err(Error::FailedToInitializeFromSnapshot)?,
to_finalize: RwLock::new(None),
remote_head_block: RwLock::new(None),
config: RwLock::new(config),
spec,
}),
log,
})
}
pub fn set_to_finalize(&self, eth1_data: Option<Eth1Data>) {
*(self.inner.to_finalize.write()) = eth1_data;
}
/// Returns the follow distance that has been shortened to accommodate for differences in the
/// spacing between blocks.
///
@@ -521,7 +562,7 @@ impl Service {
let deposits = self.deposits().read();
deposits
.cache
.get_valid_signature_count(deposits.cache.latest_block_number()?)
.get_valid_signature_count(deposits.cache.latest_block_number())
}
/// Returns the number of deposits with valid signatures that have been observed up to and
@@ -619,7 +660,8 @@ impl Service {
"old_block_number" => deposit_cache.last_processed_block,
"new_block_number" => deposit_cache.cache.latest_block_number(),
);
deposit_cache.last_processed_block = deposit_cache.cache.latest_block_number();
deposit_cache.last_processed_block =
Some(deposit_cache.cache.latest_block_number());
}
let outcome =
@@ -698,6 +740,37 @@ impl Service {
"deposits" => format!("{:?}", deposit),
),
};
let optional_eth1data = self.inner.to_finalize.write().take();
if let Some(eth1data_to_finalize) = optional_eth1data {
let already_finalized = self
.inner
.deposit_cache
.read()
.cache
.finalized_deposit_count();
let deposit_count_to_finalize = eth1data_to_finalize.deposit_count;
if deposit_count_to_finalize > already_finalized {
match self.finalize_deposits(eth1data_to_finalize) {
Err(e) => error!(
self.log,
"Failed to finalize deposit cache";
"error" => ?e,
),
Ok(()) => info!(
self.log,
"Successfully finalized deposit tree";
"finalized deposit count" => deposit_count_to_finalize,
),
}
} else {
debug!(
self.log,
"Deposits tree already finalized";
"already_finalized" => already_finalized,
"deposit_count_to_finalize" => deposit_count_to_finalize,
);
}
}
Ok(())
}
@@ -733,6 +806,30 @@ impl Service {
)
}
pub fn finalize_deposits(&self, eth1_data: Eth1Data) -> Result<(), Error> {
let eth1_block = self
.inner
.block_cache
.read()
.block_by_hash(&eth1_data.block_hash)
.cloned()
.ok_or_else(|| {
Error::FailedToFinalizeDeposit(
"Finalized block not found in block cache".to_string(),
)
})?;
self.inner
.deposit_cache
.write()
.cache
.finalize(eth1_block)
.map_err(|e| Error::FailedToFinalizeDeposit(format!("{:?}", e)))
}
pub fn get_deposit_snapshot(&self) -> Option<DepositTreeSnapshot> {
self.inner.deposit_cache.read().cache.get_deposit_snapshot()
}
/// Contacts the remote eth1 node and attempts to import deposit logs up to the configured
/// follow-distance block.
///