mirror of
https://github.com/sigp/lighthouse.git
synced 2026-04-17 04:48:21 +00:00
Always use a separate database for blobs (#4892)
* Always use a separate blobs DB * Add + update tests
This commit is contained in:
@@ -35,7 +35,7 @@ use state_processing::{
|
||||
use std::cmp::min;
|
||||
use std::convert::TryInto;
|
||||
use std::marker::PhantomData;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::path::Path;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use types::blob_sidecar::BlobSidecarList;
|
||||
@@ -61,7 +61,7 @@ pub struct HotColdDB<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> {
|
||||
/// Cold database containing compact historical data.
|
||||
pub cold_db: Cold,
|
||||
/// Database containing blobs. If None, store falls back to use `cold_db`.
|
||||
pub blobs_db: Option<Cold>,
|
||||
pub blobs_db: Cold,
|
||||
/// Hot database containing duplicated but quick-to-access recent data.
|
||||
///
|
||||
/// The hot database also contains all blocks.
|
||||
@@ -138,7 +138,6 @@ pub enum HotColdDBError {
|
||||
MissingExecutionPayload(Hash256),
|
||||
MissingFullBlockExecutionPayloadPruned(Hash256, Slot),
|
||||
MissingAnchorInfo,
|
||||
MissingPathToBlobsDatabase,
|
||||
BlobsPreviouslyInDefaultStore,
|
||||
HotStateSummaryError(BeaconStateError),
|
||||
RestorePointDecodeError(ssz::DecodeError),
|
||||
@@ -178,7 +177,7 @@ impl<E: EthSpec> HotColdDB<E, MemoryStore<E>, MemoryStore<E>> {
|
||||
anchor_info: RwLock::new(None),
|
||||
blob_info: RwLock::new(BlobInfo::default()),
|
||||
cold_db: MemoryStore::open(),
|
||||
blobs_db: Some(MemoryStore::open()),
|
||||
blobs_db: MemoryStore::open(),
|
||||
hot_db: MemoryStore::open(),
|
||||
block_cache: Mutex::new(BlockCache::new(config.block_cache_size)),
|
||||
state_cache: Mutex::new(LruCache::new(config.historic_state_cache_size)),
|
||||
@@ -202,7 +201,7 @@ impl<E: EthSpec> HotColdDB<E, LevelDB<E>, LevelDB<E>> {
|
||||
pub fn open(
|
||||
hot_path: &Path,
|
||||
cold_path: &Path,
|
||||
blobs_db_path: Option<PathBuf>,
|
||||
blobs_db_path: &Path,
|
||||
migrate_schema: impl FnOnce(Arc<Self>, SchemaVersion, SchemaVersion) -> Result<(), Error>,
|
||||
config: StoreConfig,
|
||||
spec: ChainSpec,
|
||||
@@ -215,7 +214,7 @@ impl<E: EthSpec> HotColdDB<E, LevelDB<E>, LevelDB<E>> {
|
||||
anchor_info: RwLock::new(None),
|
||||
blob_info: RwLock::new(BlobInfo::default()),
|
||||
cold_db: LevelDB::open(cold_path)?,
|
||||
blobs_db: None,
|
||||
blobs_db: LevelDB::open(blobs_db_path)?,
|
||||
hot_db: LevelDB::open(hot_path)?,
|
||||
block_cache: Mutex::new(BlockCache::new(config.block_cache_size)),
|
||||
state_cache: Mutex::new(LruCache::new(config.historic_state_cache_size)),
|
||||
@@ -271,37 +270,29 @@ impl<E: EthSpec> HotColdDB<E, LevelDB<E>, LevelDB<E>> {
|
||||
Some(blob_info) => {
|
||||
// If the oldest block slot is already set do not allow the blob DB path to be
|
||||
// changed (require manual migration).
|
||||
if blob_info.oldest_blob_slot.is_some() {
|
||||
if blobs_db_path.is_some() && !blob_info.blobs_db {
|
||||
return Err(HotColdDBError::BlobsPreviouslyInDefaultStore.into());
|
||||
} else if blobs_db_path.is_none() && blob_info.blobs_db {
|
||||
return Err(HotColdDBError::MissingPathToBlobsDatabase.into());
|
||||
}
|
||||
if blob_info.oldest_blob_slot.is_some() && !blob_info.blobs_db {
|
||||
return Err(HotColdDBError::BlobsPreviouslyInDefaultStore.into());
|
||||
}
|
||||
// Set the oldest blob slot to the Deneb fork slot if it is not yet set.
|
||||
// Always initialize `blobs_db` to true, we no longer support storing the blobs
|
||||
// in the freezer DB, because the UX is strictly worse for relocating the DB.
|
||||
let oldest_blob_slot = blob_info.oldest_blob_slot.or(deneb_fork_slot);
|
||||
BlobInfo {
|
||||
oldest_blob_slot,
|
||||
blobs_db: blobs_db_path.is_some(),
|
||||
blobs_db: true,
|
||||
}
|
||||
}
|
||||
// First start.
|
||||
None => BlobInfo {
|
||||
// Set the oldest blob slot to the Deneb fork slot if it is not yet set.
|
||||
oldest_blob_slot: deneb_fork_slot,
|
||||
blobs_db: blobs_db_path.is_some(),
|
||||
blobs_db: true,
|
||||
},
|
||||
};
|
||||
if new_blob_info.blobs_db {
|
||||
if let Some(path) = &blobs_db_path {
|
||||
db.blobs_db = Some(LevelDB::open(path.as_path())?);
|
||||
}
|
||||
}
|
||||
db.compare_and_set_blob_info_with_write(<_>::default(), new_blob_info.clone())?;
|
||||
info!(
|
||||
db.log,
|
||||
"Blob DB initialized";
|
||||
"separate_db" => new_blob_info.blobs_db,
|
||||
"path" => ?blobs_db_path,
|
||||
"oldest_blob_slot" => ?new_blob_info.oldest_blob_slot,
|
||||
);
|
||||
@@ -575,8 +566,8 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
||||
|
||||
/// Check if the blobs for a block exists on disk.
|
||||
pub fn blobs_exist(&self, block_root: &Hash256) -> Result<bool, Error> {
|
||||
let blobs_db = self.blobs_db.as_ref().unwrap_or(&self.cold_db);
|
||||
blobs_db.key_exists(DBColumn::BeaconBlob.into(), block_root.as_bytes())
|
||||
self.blobs_db
|
||||
.key_exists(DBColumn::BeaconBlob.into(), block_root.as_bytes())
|
||||
}
|
||||
|
||||
/// Determine whether a block exists in the database.
|
||||
@@ -592,13 +583,12 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
||||
.key_delete(DBColumn::BeaconBlock.into(), block_root.as_bytes())?;
|
||||
self.hot_db
|
||||
.key_delete(DBColumn::ExecPayload.into(), block_root.as_bytes())?;
|
||||
let blobs_db = self.blobs_db.as_ref().unwrap_or(&self.cold_db);
|
||||
blobs_db.key_delete(DBColumn::BeaconBlob.into(), block_root.as_bytes())
|
||||
self.blobs_db
|
||||
.key_delete(DBColumn::BeaconBlob.into(), block_root.as_bytes())
|
||||
}
|
||||
|
||||
pub fn put_blobs(&self, block_root: &Hash256, blobs: BlobSidecarList<E>) -> Result<(), Error> {
|
||||
let blobs_db = self.blobs_db.as_ref().unwrap_or(&self.cold_db);
|
||||
blobs_db.put_bytes(
|
||||
self.blobs_db.put_bytes(
|
||||
DBColumn::BeaconBlob.into(),
|
||||
block_root.as_bytes(),
|
||||
&blobs.as_ssz_bytes(),
|
||||
@@ -988,9 +978,9 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
||||
let mut guard = self.block_cache.lock();
|
||||
|
||||
let blob_cache_ops = blobs_ops.clone();
|
||||
let blobs_db = self.blobs_db.as_ref().unwrap_or(&self.cold_db);
|
||||
// Try to execute blobs store ops.
|
||||
blobs_db.do_atomically(self.convert_to_kv_batch(blobs_ops)?)?;
|
||||
self.blobs_db
|
||||
.do_atomically(self.convert_to_kv_batch(blobs_ops)?)?;
|
||||
|
||||
let hot_db_cache_ops = hot_db_ops.clone();
|
||||
// Try to execute hot db store ops.
|
||||
@@ -1018,7 +1008,8 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
||||
};
|
||||
*op = reverse_op;
|
||||
}
|
||||
blobs_db.do_atomically(self.convert_to_kv_batch(blob_cache_ops)?)?;
|
||||
self.blobs_db
|
||||
.do_atomically(self.convert_to_kv_batch(blob_cache_ops)?)?;
|
||||
return Err(e);
|
||||
}
|
||||
|
||||
@@ -1436,15 +1427,16 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
||||
|
||||
/// Fetch blobs for a given block from the store.
|
||||
pub fn get_blobs(&self, block_root: &Hash256) -> Result<Option<BlobSidecarList<E>>, Error> {
|
||||
let blobs_db = self.blobs_db.as_ref().unwrap_or(&self.cold_db);
|
||||
|
||||
// Check the cache.
|
||||
if let Some(blobs) = self.block_cache.lock().get_blobs(block_root) {
|
||||
metrics::inc_counter(&metrics::BEACON_BLOBS_CACHE_HIT_COUNT);
|
||||
return Ok(Some(blobs.clone()));
|
||||
}
|
||||
|
||||
match blobs_db.get_bytes(DBColumn::BeaconBlob.into(), block_root.as_bytes())? {
|
||||
match self
|
||||
.blobs_db
|
||||
.get_bytes(DBColumn::BeaconBlob.into(), block_root.as_bytes())?
|
||||
{
|
||||
Some(ref blobs_bytes) => {
|
||||
let blobs = BlobSidecarList::from_ssz_bytes(blobs_bytes)?;
|
||||
self.block_cache
|
||||
@@ -1640,7 +1632,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
||||
});
|
||||
let blob_info = BlobInfo {
|
||||
oldest_blob_slot,
|
||||
blobs_db: self.blobs_db.is_some(),
|
||||
blobs_db: true,
|
||||
};
|
||||
self.compare_and_set_blob_info(self.get_blob_info(), blob_info)
|
||||
}
|
||||
|
||||
@@ -135,7 +135,7 @@ pub struct BlobInfo {
|
||||
/// If the `oldest_blob_slot` is `None` then this means that the Deneb fork epoch is not yet
|
||||
/// known.
|
||||
pub oldest_blob_slot: Option<Slot>,
|
||||
/// A separate blobs database is in use.
|
||||
/// A separate blobs database is in use (deprecated, always `true`).
|
||||
pub blobs_db: bool,
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user