mirror of
https://github.com/sigp/lighthouse.git
synced 2026-03-22 14:24:44 +00:00
Optimise beacon chain persistence (#851)
* Unfinished progress * Update more persistence code * Start fixing tests * Combine persist head and fork choice * Persist head on reorg * Gracefully handle op pool and eth1 cache missing * Fix test failure * Address Michael's comments
This commit is contained in:
@@ -1,7 +1,11 @@
|
||||
use crate::eth1_chain::CachingEth1Backend;
|
||||
use crate::beacon_chain::{
|
||||
BEACON_CHAIN_DB_KEY, ETH1_CACHE_DB_KEY, FORK_CHOICE_DB_KEY, OP_POOL_DB_KEY,
|
||||
};
|
||||
use crate::eth1_chain::{CachingEth1Backend, SszEth1};
|
||||
use crate::events::NullEventHandler;
|
||||
use crate::fork_choice::SszForkChoice;
|
||||
use crate::head_tracker::HeadTracker;
|
||||
use crate::persisted_beacon_chain::{PersistedBeaconChain, BEACON_CHAIN_DB_KEY};
|
||||
use crate::persisted_beacon_chain::PersistedBeaconChain;
|
||||
use crate::shuffling_cache::ShufflingCache;
|
||||
use crate::timeout_rw_lock::TimeoutRwLock;
|
||||
use crate::validator_pubkey_cache::ValidatorPubkeyCache;
|
||||
@@ -10,7 +14,7 @@ use crate::{
|
||||
ForkChoice,
|
||||
};
|
||||
use eth1::Config as Eth1Config;
|
||||
use operation_pool::OperationPool;
|
||||
use operation_pool::{OperationPool, PersistedOperationPool};
|
||||
use proto_array_fork_choice::ProtoArrayForkChoice;
|
||||
use slog::{info, Logger};
|
||||
use slot_clock::{SlotClock, TestingSlotClock};
|
||||
@@ -67,6 +71,7 @@ where
|
||||
pub struct BeaconChainBuilder<T: BeaconChainTypes> {
|
||||
store: Option<Arc<T::Store>>,
|
||||
store_migrator: Option<T::StoreMigrator>,
|
||||
canonical_head: Option<CheckPoint<T::EthSpec>>,
|
||||
/// The finalized checkpoint to anchor the chain. May be genesis or a higher
|
||||
/// checkpoint.
|
||||
pub finalized_checkpoint: Option<CheckPoint<T::EthSpec>>,
|
||||
@@ -76,7 +81,6 @@ pub struct BeaconChainBuilder<T: BeaconChainTypes> {
|
||||
eth1_chain: Option<Eth1Chain<T::Eth1Chain, T::EthSpec, T::Store>>,
|
||||
event_handler: Option<T::EventHandler>,
|
||||
slot_clock: Option<T::SlotClock>,
|
||||
persisted_beacon_chain: Option<PersistedBeaconChain<T>>,
|
||||
head_tracker: Option<HeadTracker>,
|
||||
data_dir: Option<PathBuf>,
|
||||
pubkey_cache_path: Option<PathBuf>,
|
||||
@@ -105,6 +109,7 @@ where
|
||||
Self {
|
||||
store: None,
|
||||
store_migrator: None,
|
||||
canonical_head: None,
|
||||
finalized_checkpoint: None,
|
||||
genesis_block_root: None,
|
||||
op_pool: None,
|
||||
@@ -112,7 +117,6 @@ where
|
||||
eth1_chain: None,
|
||||
event_handler: None,
|
||||
slot_clock: None,
|
||||
persisted_beacon_chain: None,
|
||||
head_tracker: None,
|
||||
pubkey_cache_path: None,
|
||||
data_dir: None,
|
||||
@@ -162,10 +166,22 @@ where
|
||||
self
|
||||
}
|
||||
|
||||
/// Attempt to load an existing eth1 cache from the builder's `Store`.
|
||||
pub fn get_persisted_eth1_backend(&self) -> Result<Option<SszEth1>, String> {
|
||||
let store = self
|
||||
.store
|
||||
.clone()
|
||||
.ok_or_else(|| "get_persisted_eth1_backend requires a store.".to_string())?;
|
||||
|
||||
store
|
||||
.get::<SszEth1>(&Hash256::from_slice(Ð1_CACHE_DB_KEY))
|
||||
.map_err(|e| format!("DB error whilst reading eth1 cache: {:?}", e))
|
||||
}
|
||||
|
||||
/// Attempt to load an existing chain from the builder's `Store`.
|
||||
///
|
||||
/// May initialize several components; including the op_pool and finalized checkpoints.
|
||||
pub fn resume_from_db(mut self, config: Eth1Config) -> Result<Self, String> {
|
||||
pub fn resume_from_db(mut self) -> Result<Self, String> {
|
||||
let log = self
|
||||
.log
|
||||
.as_ref()
|
||||
@@ -187,37 +203,63 @@ where
|
||||
.clone()
|
||||
.ok_or_else(|| "load_from_store requires a store.".to_string())?;
|
||||
|
||||
let key = Hash256::from_slice(&BEACON_CHAIN_DB_KEY.as_bytes());
|
||||
let p: PersistedBeaconChain<
|
||||
Witness<TStore, TStoreMigrator, TSlotClock, TEth1Backend, TEthSpec, TEventHandler>,
|
||||
> = match store.get(&key) {
|
||||
Err(e) => {
|
||||
return Err(format!(
|
||||
"DB error when reading persisted beacon chain: {:?}",
|
||||
e
|
||||
))
|
||||
}
|
||||
Ok(None) => return Err("No persisted beacon chain found in store".into()),
|
||||
Ok(Some(p)) => p,
|
||||
};
|
||||
let chain = store
|
||||
.get::<PersistedBeaconChain>(&Hash256::from_slice(&BEACON_CHAIN_DB_KEY))
|
||||
.map_err(|e| format!("DB error when reading persisted beacon chain: {:?}", e))?
|
||||
.ok_or_else(|| {
|
||||
"No persisted beacon chain found in store. Try deleting the .lighthouse/beacon dir."
|
||||
.to_string()
|
||||
})?;
|
||||
|
||||
self.op_pool = Some(
|
||||
p.op_pool
|
||||
.clone()
|
||||
.into_operation_pool(&p.canonical_head.beacon_state, &self.spec),
|
||||
);
|
||||
|
||||
self.finalized_checkpoint = Some(p.finalized_checkpoint.clone());
|
||||
self.genesis_block_root = Some(p.genesis_block_root);
|
||||
self.genesis_block_root = Some(chain.genesis_block_root);
|
||||
self.head_tracker = Some(
|
||||
HeadTracker::from_ssz_container(&p.ssz_head_tracker)
|
||||
HeadTracker::from_ssz_container(&chain.ssz_head_tracker)
|
||||
.map_err(|e| format!("Failed to decode head tracker for database: {:?}", e))?,
|
||||
);
|
||||
self.eth1_chain = match &p.eth1_cache {
|
||||
Some(cache) => Some(Eth1Chain::from_ssz_container(cache, config, store, log)?),
|
||||
None => None,
|
||||
};
|
||||
self.persisted_beacon_chain = Some(p);
|
||||
|
||||
let head_block_root = chain.canonical_head_block_root;
|
||||
let head_block = store
|
||||
.get::<SignedBeaconBlock<TEthSpec>>(&head_block_root)
|
||||
.map_err(|e| format!("DB error when reading head block: {:?}", e))?
|
||||
.ok_or_else(|| "Head block not found in store".to_string())?;
|
||||
let head_state_root = head_block.state_root();
|
||||
let head_state = store
|
||||
.get_state(&head_state_root, Some(head_block.slot()))
|
||||
.map_err(|e| format!("DB error when reading head state: {:?}", e))?
|
||||
.ok_or_else(|| "Head state not found in store".to_string())?;
|
||||
|
||||
self.op_pool = Some(
|
||||
store
|
||||
.get::<PersistedOperationPool<TEthSpec>>(&Hash256::from_slice(&OP_POOL_DB_KEY))
|
||||
.map_err(|e| format!("DB error whilst reading persisted op pool: {:?}", e))?
|
||||
.map(|persisted| persisted.into_operation_pool(&head_state, &self.spec))
|
||||
.unwrap_or_else(|| OperationPool::new()),
|
||||
);
|
||||
|
||||
let finalized_block_root = head_state.finalized_checkpoint.root;
|
||||
let finalized_block = store
|
||||
.get::<SignedBeaconBlock<TEthSpec>>(&finalized_block_root)
|
||||
.map_err(|e| format!("DB error when reading finalized block: {:?}", e))?
|
||||
.ok_or_else(|| "Finalized block not found in store".to_string())?;
|
||||
let finalized_state_root = finalized_block.state_root();
|
||||
let finalized_state = store
|
||||
.get_state(&finalized_state_root, Some(finalized_block.slot()))
|
||||
.map_err(|e| format!("DB error when reading finalized state: {:?}", e))?
|
||||
.ok_or_else(|| "Finalized state not found in store".to_string())?;
|
||||
|
||||
self.finalized_checkpoint = Some(CheckPoint {
|
||||
beacon_block_root: finalized_block_root,
|
||||
beacon_block: finalized_block,
|
||||
beacon_state_root: finalized_state_root,
|
||||
beacon_state: finalized_state,
|
||||
});
|
||||
|
||||
self.canonical_head = Some(CheckPoint {
|
||||
beacon_block_root: head_block_root,
|
||||
beacon_block: head_block,
|
||||
beacon_state_root: head_state_root,
|
||||
beacon_state: head_state,
|
||||
});
|
||||
|
||||
let pubkey_cache = ValidatorPubkeyCache::load_from_file(pubkey_cache_path)
|
||||
.map_err(|e| format!("Unable to open persisted pubkey cache: {:?}", e))?;
|
||||
@@ -322,8 +364,8 @@ where
|
||||
|
||||
// If this beacon chain is being loaded from disk, use the stored head. Otherwise, just use
|
||||
// the finalized checkpoint (which is probably genesis).
|
||||
let mut canonical_head = if let Some(persisted_beacon_chain) = self.persisted_beacon_chain {
|
||||
persisted_beacon_chain.canonical_head
|
||||
let mut canonical_head = if let Some(head) = self.canonical_head {
|
||||
head
|
||||
} else {
|
||||
self.finalized_checkpoint
|
||||
.ok_or_else(|| "Cannot build without a state".to_string())?
|
||||
@@ -414,9 +456,18 @@ where
|
||||
/// If this builder is being "resumed" from disk, then rebuild the last fork choice stored to
|
||||
/// the database. Otherwise, create a new, empty fork choice.
|
||||
pub fn reduced_tree_fork_choice(mut self) -> Result<Self, String> {
|
||||
let fork_choice = if let Some(persisted_beacon_chain) = &self.persisted_beacon_chain {
|
||||
ForkChoice::from_ssz_container(persisted_beacon_chain.fork_choice.clone())
|
||||
.map_err(|e| format!("Unable to decode fork choice from db: {:?}", e))?
|
||||
let store = self
|
||||
.store
|
||||
.clone()
|
||||
.ok_or_else(|| "reduced_tree_fork_choice requires a store.".to_string())?;
|
||||
|
||||
let persisted_fork_choice = store
|
||||
.get::<SszForkChoice>(&Hash256::from_slice(&FORK_CHOICE_DB_KEY))
|
||||
.map_err(|e| format!("DB error when reading persisted fork choice: {:?}", e))?;
|
||||
|
||||
let fork_choice = if let Some(persisted) = persisted_fork_choice {
|
||||
ForkChoice::from_ssz_container(persisted)
|
||||
.map_err(|e| format!("Unable to read persisted fork choice from disk: {:?}", e))?
|
||||
} else {
|
||||
let finalized_checkpoint = &self
|
||||
.finalized_checkpoint
|
||||
@@ -469,11 +520,6 @@ where
|
||||
TEthSpec: EthSpec + 'static,
|
||||
TEventHandler: EventHandler<TEthSpec> + 'static,
|
||||
{
|
||||
/// Sets the `BeaconChain` eth1 back-end to `CachingEth1Backend`.
|
||||
pub fn caching_eth1_backend(self, backend: CachingEth1Backend<TEthSpec, TStore>) -> Self {
|
||||
self.eth1_backend(Some(backend))
|
||||
}
|
||||
|
||||
/// Do not use any eth1 backend. The client will not be able to produce beacon blocks.
|
||||
pub fn no_eth1_backend(self) -> Self {
|
||||
self.eth1_backend(None)
|
||||
|
||||
Reference in New Issue
Block a user