mirror of
https://github.com/sigp/lighthouse.git
synced 2026-03-16 11:22:56 +00:00
Store blobs in separate freezer or historical state freezer
This commit is contained in:
@@ -35,7 +35,7 @@ use state_processing::{
|
||||
use std::cmp::min;
|
||||
use std::convert::TryInto;
|
||||
use std::marker::PhantomData;
|
||||
use std::path::Path;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use types::consts::eip4844::MIN_EPOCHS_FOR_BLOBS_SIDECARS_REQUESTS;
|
||||
@@ -59,6 +59,8 @@ pub struct HotColdDB<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> {
|
||||
pub(crate) config: StoreConfig,
|
||||
/// Cold database containing compact historical data.
|
||||
pub cold_db: Cold,
|
||||
/// Cold database containing blob data with slots less than `split.slot`.
|
||||
pub cold_blobs_db: Option<Cold>,
|
||||
/// Hot database containing duplicated but quick-to-access recent data.
|
||||
///
|
||||
/// The hot database also contains all blocks.
|
||||
@@ -92,6 +94,7 @@ pub enum HotColdDBError {
|
||||
MissingRestorePointHash(u64),
|
||||
MissingRestorePoint(Hash256),
|
||||
MissingColdStateSummary(Hash256),
|
||||
MissingColdBlobs(Hash256),
|
||||
MissingHotStateSummary(Hash256),
|
||||
MissingEpochBoundaryState(Hash256),
|
||||
MissingSplitState(Hash256, Slot),
|
||||
@@ -134,6 +137,7 @@ impl<E: EthSpec> HotColdDB<E, MemoryStore<E>, MemoryStore<E>> {
|
||||
anchor_info: RwLock::new(None),
|
||||
blob_info: RwLock::new(BlobInfo::default()),
|
||||
cold_db: MemoryStore::open(),
|
||||
cold_blobs_db: Some(MemoryStore::open()),
|
||||
hot_db: MemoryStore::open(),
|
||||
block_cache: Mutex::new(LruCache::new(config.block_cache_size)),
|
||||
blob_cache: Mutex::new(LruCache::new(config.blob_cache_size)),
|
||||
@@ -157,6 +161,7 @@ impl<E: EthSpec> HotColdDB<E, LevelDB<E>, LevelDB<E>> {
|
||||
pub fn open(
|
||||
hot_path: &Path,
|
||||
cold_path: &Path,
|
||||
cold_blobs_path: Option<PathBuf>,
|
||||
migrate_schema: impl FnOnce(Arc<Self>, SchemaVersion, SchemaVersion) -> Result<(), Error>,
|
||||
config: StoreConfig,
|
||||
spec: ChainSpec,
|
||||
@@ -164,11 +169,18 @@ impl<E: EthSpec> HotColdDB<E, LevelDB<E>, LevelDB<E>> {
|
||||
) -> Result<Arc<Self>, Error> {
|
||||
Self::verify_slots_per_restore_point(config.slots_per_restore_point)?;
|
||||
|
||||
let cold_blobs_db = if let Some(path) = cold_blobs_path {
|
||||
Some(LevelDB::open(path.as_path())?)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
let mut db = HotColdDB {
|
||||
split: RwLock::new(Split::default()),
|
||||
anchor_info: RwLock::new(None),
|
||||
blob_info: RwLock::new(BlobInfo::default()),
|
||||
cold_db: LevelDB::open(cold_path)?,
|
||||
cold_blobs_db,
|
||||
hot_db: LevelDB::open(hot_path)?,
|
||||
block_cache: Mutex::new(LruCache::new(config.block_cache_size)),
|
||||
blob_cache: Mutex::new(LruCache::new(config.blob_cache_size)),
|
||||
@@ -532,7 +544,19 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
||||
self.blob_cache.lock().put(*block_root, ret.clone());
|
||||
Ok(Some(ret))
|
||||
} else {
|
||||
Ok(None)
|
||||
let blobs_freezer = if let Some(ref cold_blobs_db) = self.cold_blobs_db {
|
||||
cold_blobs_db
|
||||
} else {
|
||||
&self.cold_db
|
||||
};
|
||||
|
||||
if let Some(ref blobs_bytes) =
|
||||
blobs_freezer.get_bytes(DBColumn::BeaconBlob.into(), block_root.as_bytes())?
|
||||
{
|
||||
Ok(Some(BlobsSidecar::from_ssz_bytes(blobs_bytes)?))
|
||||
} else {
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1918,6 +1942,13 @@ pub fn migrate_database<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>(
|
||||
}
|
||||
|
||||
let mut hot_db_ops: Vec<StoreOp<E>> = Vec::new();
|
||||
let mut cold_blobs_db_ops: Vec<StoreOp<E>> = Vec::new();
|
||||
|
||||
let blobs_freezer = if let Some(ref cold_blobs_db) = store.cold_blobs_db {
|
||||
cold_blobs_db
|
||||
} else {
|
||||
&store.cold_db
|
||||
};
|
||||
|
||||
// 1. Copy all of the states between the head and the split slot, from the hot DB
|
||||
// to the cold DB. Delete the execution payloads of these now-finalized blocks.
|
||||
@@ -1961,8 +1992,17 @@ pub fn migrate_database<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>(
|
||||
if store.config.prune_payloads {
|
||||
hot_db_ops.push(StoreOp::DeleteExecutionPayload(block_root));
|
||||
}
|
||||
|
||||
// Prepare migration of blobs to freezer.
|
||||
if let Some(blobs) = store.get_blobs(&block_root)? {
|
||||
hot_db_ops.push(StoreOp::DeleteBlobs(block_root));
|
||||
cold_blobs_db_ops.push(StoreOp::PutBlobs(block_root, Arc::new(blobs)));
|
||||
}
|
||||
}
|
||||
|
||||
// Migrate blobs to freezer.
|
||||
blobs_freezer.do_atomically(store.convert_to_kv_batch(cold_blobs_db_ops)?)?;
|
||||
|
||||
// Warning: Critical section. We have to take care not to put any of the two databases in an
|
||||
// inconsistent state if the OS process dies at any point during the freezing
|
||||
// procedure.
|
||||
@@ -1975,6 +2015,9 @@ pub fn migrate_database<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>(
|
||||
|
||||
// Flush to disk all the states that have just been migrated to the cold store.
|
||||
store.cold_db.sync()?;
|
||||
if let Some(ref cold_blobs_db) = store.cold_blobs_db {
|
||||
cold_blobs_db.sync()?;
|
||||
}
|
||||
|
||||
{
|
||||
let mut split_guard = store.split.write();
|
||||
|
||||
Reference in New Issue
Block a user