Persist light client bootstrap (#5915)

* persist light client updates

* update beacon chain to serve light client updates

* resolve todos

* cache best update

* extend cache parts

* is better light client update

* resolve merge conflict

* initial api changes

* add lc update db column

* fmt

* added tests

* add sim

* Merge branch 'unstable' of https://github.com/sigp/lighthouse into persist-light-client-updates

* fix some weird issues with the simulator

* tests

* Merge branch 'unstable' of https://github.com/sigp/lighthouse into persist-light-client-updates

* test changes

* merge conflict

* testing

* started work on ef tests and some code clean up

* update tests

* linting

* noop pre altair, were still failing on electra though

* allow for zeroed light client header

* Merge branch 'unstable' of https://github.com/sigp/lighthouse into persist-light-client-updates

* merge unstable

* remove unwraps

* remove unwraps

* fetch bootstrap without always querying for state

* storing bootstrap parts in db

* mroe code cleanup

* test

* prune sync committee branches from dropped chains

* Update light_client_update.rs

* merge unstable

* move functionality to helper methods

* refactor is best update fn

* refactor is best update fn

* improve organization of light client server cache logic

* fork diget calc, and only spawn as many blcoks as we need for the lc update test

* resovle merge conflict

* add electra bootstrap logic, add logic to cache current sync committee

* add latest sync committe branch cache

* fetch lc update from the cache if it exists

* fmt

* Fix beacon_chain tests

* Add debug code to update ranking_order ef test

* Fix compare code

* merge conflicts

* merge conflict

* add better error messaging

* resolve merge conflicts

* remove lc update from basicsim

* rename sync comittte variable and fix persist condition

* refactor get_light_client_update logic

* add better comments, return helpful error messages over http and rpc

* pruning canonical non checkpoint slots

* fix test

* rerun test

* update pruning logic, add tests

* fix tests

* fix imports

* fmt

* refactor db code

* Refactor db method

* Refactor db method

* add additional comments

* Merge branch 'unstable' of https://github.com/sigp/lighthouse into persist-light-client-bootstrap

* fix merge

* linting

* merge conflict

* prevent overflow

* enable lc server for http api tests

* fix tests

* remove prints

* remove warning

* revert change
This commit is contained in:
Eitan Seri-Levi
2024-09-09 17:27:49 -07:00
committed by GitHub
parent 51091a40fa
commit a94b12b4d5
19 changed files with 733 additions and 238 deletions

View File

@@ -59,6 +59,7 @@ pub enum Error {
state_root: Hash256,
slot: Slot,
},
ArithError(safe_arith::ArithError),
}
pub trait HandleUnavailable<T> {
@@ -129,6 +130,12 @@ impl From<EpochCacheError> for Error {
}
}
impl From<safe_arith::ArithError> for Error {
fn from(e: safe_arith::ArithError) -> Error {
Error::ArithError(e)
}
}
#[derive(Debug)]
pub struct DBError {
pub message: String,

View File

@@ -27,6 +27,7 @@ use itertools::process_results;
use leveldb::iterator::LevelDBIterator;
use lru::LruCache;
use parking_lot::{Mutex, RwLock};
use safe_arith::SafeArith;
use serde::{Deserialize, Serialize};
use slog::{debug, error, info, trace, warn, Logger};
use ssz::{Decode, Encode};
@@ -36,13 +37,14 @@ use state_processing::{
SlotProcessingError,
};
use std::cmp::min;
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::marker::PhantomData;
use std::num::NonZeroUsize;
use std::path::Path;
use std::sync::Arc;
use std::time::Duration;
use types::data_column_sidecar::{ColumnIndex, DataColumnSidecar, DataColumnSidecarList};
use types::light_client_update::CurrentSyncCommitteeProofLen;
use types::*;
/// On-disk database that stores finalized states efficiently.
@@ -634,6 +636,143 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
.map(|payload| payload.is_some())
}
/// Get the sync committee branch for the given block root
/// Note: we only persist sync committee branches for checkpoint slots
pub fn get_sync_committee_branch(
&self,
block_root: &Hash256,
) -> Result<Option<FixedVector<Hash256, CurrentSyncCommitteeProofLen>>, Error> {
let column = DBColumn::SyncCommitteeBranch;
if let Some(bytes) = self
.hot_db
.get_bytes(column.into(), &block_root.as_ssz_bytes())?
{
let sync_committee_branch: FixedVector<Hash256, CurrentSyncCommitteeProofLen> =
FixedVector::from_ssz_bytes(&bytes)?;
return Ok(Some(sync_committee_branch));
}
Ok(None)
}
/// Fetch sync committee by sync committee period
pub fn get_sync_committee(
&self,
sync_committee_period: u64,
) -> Result<Option<SyncCommittee<E>>, Error> {
let column = DBColumn::SyncCommittee;
if let Some(bytes) = self
.hot_db
.get_bytes(column.into(), &sync_committee_period.as_ssz_bytes())?
{
let sync_committee: SyncCommittee<E> = SyncCommittee::from_ssz_bytes(&bytes)?;
return Ok(Some(sync_committee));
}
Ok(None)
}
pub fn store_sync_committee_branch(
&self,
block_root: Hash256,
sync_committee_branch: &FixedVector<Hash256, CurrentSyncCommitteeProofLen>,
) -> Result<(), Error> {
let column = DBColumn::SyncCommitteeBranch;
self.hot_db.put_bytes(
column.into(),
&block_root.as_ssz_bytes(),
&sync_committee_branch.as_ssz_bytes(),
)?;
Ok(())
}
pub fn store_sync_committee(
&self,
sync_committee_period: u64,
sync_committee: &SyncCommittee<E>,
) -> Result<(), Error> {
let column = DBColumn::SyncCommittee;
self.hot_db.put_bytes(
column.into(),
&sync_committee_period.to_le_bytes(),
&sync_committee.as_ssz_bytes(),
)?;
Ok(())
}
pub fn get_light_client_update(
&self,
sync_committee_period: u64,
) -> Result<Option<LightClientUpdate<E>>, Error> {
let column = DBColumn::LightClientUpdate;
let res = self
.hot_db
.get_bytes(column.into(), &sync_committee_period.to_le_bytes())?;
if let Some(light_client_update_bytes) = res {
let epoch = sync_committee_period
.safe_mul(self.spec.epochs_per_sync_committee_period.into())?;
let fork_name = self.spec.fork_name_at_epoch(epoch.into());
let light_client_update =
LightClientUpdate::from_ssz_bytes(&light_client_update_bytes, &fork_name)?;
return Ok(Some(light_client_update));
}
Ok(None)
}
pub fn get_light_client_updates(
&self,
start_period: u64,
count: u64,
) -> Result<Vec<LightClientUpdate<E>>, Error> {
let column = DBColumn::LightClientUpdate;
let mut light_client_updates = vec![];
for res in self
.hot_db
.iter_column_from::<Vec<u8>>(column, &start_period.to_le_bytes())
{
let (sync_committee_bytes, light_client_update_bytes) = res?;
let sync_committee_period = u64::from_ssz_bytes(&sync_committee_bytes)?;
let epoch = sync_committee_period
.safe_mul(self.spec.epochs_per_sync_committee_period.into())?;
let fork_name = self.spec.fork_name_at_epoch(epoch.into());
let light_client_update =
LightClientUpdate::from_ssz_bytes(&light_client_update_bytes, &fork_name)?;
light_client_updates.push(light_client_update);
if sync_committee_period >= start_period + count {
break;
}
}
Ok(light_client_updates)
}
pub fn store_light_client_update(
&self,
sync_committee_period: u64,
light_client_update: &LightClientUpdate<E>,
) -> Result<(), Error> {
let column = DBColumn::LightClientUpdate;
self.hot_db.put_bytes(
column.into(),
&sync_committee_period.to_le_bytes(),
&light_client_update.as_ssz_bytes(),
)?;
Ok(())
}
/// Check if the blobs for a block exists on disk.
pub fn blobs_exist(&self, block_root: &Hash256) -> Result<bool, Error> {
self.blobs_db
@@ -1037,6 +1176,14 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
key_value_batch.push(KeyValueStoreOp::DeleteKey(key));
}
StoreOp::DeleteSyncCommitteeBranch(block_root) => {
let key = get_key_for_col(
DBColumn::SyncCommitteeBranch.into(),
block_root.as_slice(),
);
key_value_batch.push(KeyValueStoreOp::DeleteKey(key));
}
StoreOp::KeyValueOp(kv_op) => {
key_value_batch.push(kv_op);
}
@@ -1182,6 +1329,8 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
StoreOp::DeleteExecutionPayload(_) => (),
StoreOp::DeleteSyncCommitteeBranch(_) => (),
StoreOp::KeyValueOp(_) => (),
}
}
@@ -2816,12 +2965,16 @@ pub fn migrate_database<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>(
.into());
}
// finalized_state.slot() must be at an epoch boundary
// else we may introduce bugs to the migration/pruning logic
if finalized_state.slot() % E::slots_per_epoch() != 0 {
return Err(HotColdDBError::FreezeSlotUnaligned(finalized_state.slot()).into());
}
let mut hot_db_ops = vec![];
let mut cold_db_ops = vec![];
let mut epoch_boundary_blocks = HashSet::new();
let mut non_checkpoint_block_roots = HashSet::new();
// Chunk writer for the linear block roots in the freezer DB.
// Start at the new upper limit because we iterate backwards.
@@ -2849,6 +3002,22 @@ pub fn migrate_database<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>(
hot_db_ops.push(StoreOp::DeleteExecutionPayload(block_root));
}
// At a missed slot, `state_root_iter` will return the block root
// from the previous non-missed slot. This ensures that the block root at an
// epoch boundary is always a checkpoint block root. We keep track of block roots
// at epoch boundaries by storing them in the `epoch_boundary_blocks` hash set.
// We then ensure that block roots at the epoch boundary aren't included in the
// `non_checkpoint_block_roots` hash set.
if slot % E::slots_per_epoch() == 0 {
epoch_boundary_blocks.insert(block_root);
} else {
non_checkpoint_block_roots.insert(block_root);
}
if epoch_boundary_blocks.contains(&block_root) {
non_checkpoint_block_roots.remove(&block_root);
}
// Delete the old summary, and the full state if we lie on an epoch boundary.
hot_db_ops.push(StoreOp::DeleteState(state_root, Some(slot)));
@@ -2888,6 +3057,19 @@ pub fn migrate_database<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>(
}
}
// Prune sync committee branch data for all non checkpoint block roots.
// Note that `non_checkpoint_block_roots` should only contain non checkpoint block roots
// as long as `finalized_state.slot()` is at an epoch boundary. If this were not the case
// we risk the chance of pruning a `sync_committee_branch` for a checkpoint block root.
// E.g. if `current_split_slot` = (Epoch A slot 0) and `finalized_state.slot()` = (Epoch C slot 31)
// and (Epoch D slot 0) is a skipped slot, we will have pruned a `sync_committee_branch`
// for a checkpoint block root.
non_checkpoint_block_roots
.into_iter()
.for_each(|block_root| {
hot_db_ops.push(StoreOp::DeleteSyncCommitteeBranch(block_root));
});
// Finish writing the block roots and commit the remaining cold DB ops.
block_root_writer.write(&mut cold_db_ops)?;
store.cold_db.do_atomically(cold_db_ops)?;
@@ -2904,7 +3086,6 @@ pub fn migrate_database<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>(
// Flush to disk all the states that have just been migrated to the cold store.
store.cold_db.sync()?;
{
let mut split_guard = store.split.write();
let latest_split_slot = split_guard.slot;

View File

@@ -241,6 +241,7 @@ pub enum StoreOp<'a, E: EthSpec> {
DeleteDataColumns(Hash256, Vec<ColumnIndex>),
DeleteState(Hash256, Option<Slot>),
DeleteExecutionPayload(Hash256),
DeleteSyncCommitteeBranch(Hash256),
KeyValueOp(KeyValueStoreOp),
}
@@ -303,6 +304,12 @@ pub enum DBColumn {
/// For persisting eagerly computed light client data
#[strum(serialize = "lcu")]
LightClientUpdate,
/// For helping persist eagerly computed light client bootstrap data
#[strum(serialize = "scb")]
SyncCommitteeBranch,
/// For helping persist eagerly computed light client bootstrap data
#[strum(serialize = "scm")]
SyncCommittee,
}
/// A block from the database, which might have an execution payload or not.
@@ -346,6 +353,8 @@ impl DBColumn {
| Self::BeaconHistoricalRoots
| Self::BeaconHistoricalSummaries
| Self::BeaconRandaoMixes
| Self::SyncCommittee
| Self::SyncCommitteeBranch
| Self::LightClientUpdate => 8,
Self::BeaconDataColumn => DATA_COLUMN_DB_KEY_SIZE,
}