mirror of
https://github.com/sigp/lighthouse.git
synced 2026-03-23 14:54:45 +00:00
Fix rebase conflicts
This commit is contained in:
@@ -59,8 +59,8 @@ pub struct HotColdDB<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> {
|
||||
pub(crate) config: StoreConfig,
|
||||
/// Cold database containing compact historical data.
|
||||
pub cold_db: Cold,
|
||||
/// Cold database containing blob data with slots less than `split.slot`.
|
||||
pub cold_blobs_db: Option<Cold>,
|
||||
/// Database containing blobs.
|
||||
pub blobs_db: Option<Cold>,
|
||||
/// Hot database containing duplicated but quick-to-access recent data.
|
||||
///
|
||||
/// The hot database also contains all blocks.
|
||||
@@ -101,7 +101,7 @@ pub enum HotColdDBError {
|
||||
MissingExecutionPayload(Hash256),
|
||||
MissingFullBlockExecutionPayloadPruned(Hash256, Slot),
|
||||
MissingAnchorInfo,
|
||||
MissingPathToBlobsFreezer,
|
||||
MissingPathToBlobsDatabase,
|
||||
HotStateSummaryError(BeaconStateError),
|
||||
RestorePointDecodeError(ssz::DecodeError),
|
||||
BlockReplayBeaconError(BeaconStateError),
|
||||
@@ -138,7 +138,7 @@ impl<E: EthSpec> HotColdDB<E, MemoryStore<E>, MemoryStore<E>> {
|
||||
anchor_info: RwLock::new(None),
|
||||
blob_info: RwLock::new(BlobInfo::default()),
|
||||
cold_db: MemoryStore::open(),
|
||||
cold_blobs_db: Some(MemoryStore::open()),
|
||||
blobs_db: Some(MemoryStore::open()),
|
||||
hot_db: MemoryStore::open(),
|
||||
block_cache: Mutex::new(LruCache::new(config.block_cache_size)),
|
||||
blob_cache: Mutex::new(LruCache::new(config.blob_cache_size)),
|
||||
@@ -162,7 +162,7 @@ impl<E: EthSpec> HotColdDB<E, LevelDB<E>, LevelDB<E>> {
|
||||
pub fn open(
|
||||
hot_path: &Path,
|
||||
cold_path: &Path,
|
||||
cold_blobs_path: Option<PathBuf>,
|
||||
blobs_db_path: Option<PathBuf>,
|
||||
migrate_schema: impl FnOnce(Arc<Self>, SchemaVersion, SchemaVersion) -> Result<(), Error>,
|
||||
config: StoreConfig,
|
||||
spec: ChainSpec,
|
||||
@@ -175,7 +175,7 @@ impl<E: EthSpec> HotColdDB<E, LevelDB<E>, LevelDB<E>> {
|
||||
anchor_info: RwLock::new(None),
|
||||
blob_info: RwLock::new(BlobInfo::default()),
|
||||
cold_db: LevelDB::open(cold_path)?,
|
||||
cold_blobs_db: None,
|
||||
blobs_db: None,
|
||||
hot_db: LevelDB::open(hot_path)?,
|
||||
block_cache: Mutex::new(LruCache::new(config.block_cache_size)),
|
||||
blob_cache: Mutex::new(LruCache::new(config.blob_cache_size)),
|
||||
@@ -220,23 +220,29 @@ impl<E: EthSpec> HotColdDB<E, LevelDB<E>, LevelDB<E>> {
|
||||
);
|
||||
}
|
||||
|
||||
if db.spec.eip4844_fork_epoch.is_some() {
|
||||
let blob_info = match db.load_blob_info()? {
|
||||
Some(mut blob_info) => {
|
||||
if blob_info.blobs_freezer {
|
||||
cold_blobs_path
|
||||
.as_ref()
|
||||
.ok_or(HotColdDBError::MissingPathToBlobsFreezer)?;
|
||||
}
|
||||
if let Some(path) = cold_blobs_path {
|
||||
db.cold_blobs_db = Some(LevelDB::open(path.as_path())?);
|
||||
blob_info.blobs_freezer = true;
|
||||
}
|
||||
Some(blob_info)
|
||||
}
|
||||
None => Some(BlobInfo::default()),
|
||||
};
|
||||
*db.blob_info.write() = blob_info;
|
||||
let blob_info_on_disk = db.load_blob_info()?;
|
||||
|
||||
if let Some(ref blob_info) = blob_info_on_disk {
|
||||
let prev_blobs_db = blob_info.blobs_db;
|
||||
if prev_blobs_db {
|
||||
blobs_db_path
|
||||
.as_ref()
|
||||
.ok_or(HotColdDBError::MissingPathToBlobsDatabase)?;
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(path) = blobs_db_path {
|
||||
if db.spec.eip4844_fork_epoch.is_some() {
|
||||
db.blobs_db = Some(LevelDB::open(path.as_path())?);
|
||||
db.compare_and_set_blob_info_with_write(
|
||||
blob_info_on_disk,
|
||||
Some(BlobInfo { blobs_db: true }),
|
||||
)?;
|
||||
info!(
|
||||
db.log,
|
||||
"Blobs DB initialized";
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// Ensure that the schema version of the on-disk database matches the software.
|
||||
@@ -547,29 +553,6 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Fetch a blobs sidecar from the store.
|
||||
///
|
||||
/// If `slot` is provided then it will be used as a hint as to which database should
|
||||
/// be checked first.
|
||||
pub fn get_blobs(
|
||||
&self,
|
||||
block_root: &Hash256,
|
||||
slot: Option<Slot>,
|
||||
) -> Result<Option<BlobsSidecar<E>>, Error> {
|
||||
if let Some(slot) = slot {
|
||||
if slot < self.get_split_slot() {
|
||||
return match self.load_cold_blobs(block_root)? {
|
||||
Some(blobs) => Ok(Some(blobs)),
|
||||
None => self.load_hot_blobs(block_root),
|
||||
};
|
||||
}
|
||||
}
|
||||
match self.load_hot_blobs(block_root)? {
|
||||
Some(blobs) => Ok(Some(blobs)),
|
||||
None => self.load_cold_blobs(block_root),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn blobs_as_kv_store_ops(
|
||||
&self,
|
||||
key: &Hash256,
|
||||
@@ -910,6 +893,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
||||
|
||||
self.hot_db
|
||||
.do_atomically(self.convert_to_kv_batch(batch)?)?;
|
||||
|
||||
drop(guard);
|
||||
drop(guard_blob);
|
||||
|
||||
@@ -1246,35 +1230,22 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
||||
})
|
||||
}
|
||||
|
||||
/// Load a blobs sidecar from the hot database.
|
||||
pub fn load_hot_blobs(&self, block_root: &Hash256) -> Result<Option<BlobsSidecar<E>>, Error> {
|
||||
// FIXME(sean) I was attempting to use a blob cache here but was getting deadlocks,
|
||||
// may want to attempt to use one again
|
||||
match self
|
||||
.hot_db
|
||||
.get_bytes(DBColumn::BeaconBlob.into(), block_root.as_bytes())?
|
||||
{
|
||||
Some(bytes) => {
|
||||
let ret = BlobsSidecar::from_ssz_bytes(&bytes)?;
|
||||
self.blob_cache.lock().put(*block_root, ret.clone());
|
||||
Ok(Some(ret))
|
||||
}
|
||||
None => Ok(None),
|
||||
}
|
||||
}
|
||||
|
||||
/// Try to load a blobs from the freezer database.
|
||||
///
|
||||
/// Return `None` if no blobs sidecar with `block_root` lies in the freezer.
|
||||
pub fn load_cold_blobs(&self, block_root: &Hash256) -> Result<Option<BlobsSidecar<E>>, Error> {
|
||||
let blobs_freezer = if let Some(ref cold_blobs_db) = self.cold_blobs_db {
|
||||
cold_blobs_db
|
||||
/// Fetch a blobs sidecar from the store.
|
||||
pub fn get_blobs(&self, block_root: &Hash256) -> Result<Option<BlobsSidecar<E>>, Error> {
|
||||
let blobs_db = if let Some(ref blobs_db) = self.blobs_db {
|
||||
blobs_db
|
||||
} else {
|
||||
&self.cold_db
|
||||
};
|
||||
|
||||
match blobs_freezer.get_bytes(DBColumn::BeaconBlob.into(), block_root.as_bytes())? {
|
||||
Some(ref blobs_bytes) => Ok(Some(BlobsSidecar::from_ssz_bytes(blobs_bytes)?)),
|
||||
match blobs_db.get_bytes(DBColumn::BeaconBlob.into(), block_root.as_bytes())? {
|
||||
Some(ref blobs_bytes) => {
|
||||
let blobs = BlobsSidecar::from_ssz_bytes(blobs_bytes)?;
|
||||
// FIXME(sean) I was attempting to use a blob cache here but was getting deadlocks,
|
||||
// may want to attempt to use one again
|
||||
self.blob_cache.lock().put(*block_root, blobs.clone());
|
||||
Ok(Some(blobs))
|
||||
}
|
||||
None => Ok(None),
|
||||
}
|
||||
}
|
||||
@@ -1985,12 +1956,6 @@ pub fn migrate_database<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>(
|
||||
}
|
||||
|
||||
let mut hot_db_ops: Vec<StoreOp<E>> = Vec::new();
|
||||
let mut cold_blobs_db_ops: Vec<StoreOp<E>> = Vec::new();
|
||||
|
||||
let blobs_freezer = match store.cold_blobs_db {
|
||||
Some(ref cold_blobs_db) => cold_blobs_db,
|
||||
None => &store.cold_db,
|
||||
};
|
||||
|
||||
// 1. Copy all of the states between the head and the split slot, from the hot DB
|
||||
// to the cold DB. Delete the execution payloads of these now-finalized blocks.
|
||||
@@ -2034,17 +1999,8 @@ pub fn migrate_database<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>(
|
||||
if store.config.prune_payloads {
|
||||
hot_db_ops.push(StoreOp::DeleteExecutionPayload(block_root));
|
||||
}
|
||||
|
||||
// Prepare migration of blobs to freezer.
|
||||
if let Some(blobs) = store.get_blobs(&block_root, Some(slot))? {
|
||||
hot_db_ops.push(StoreOp::DeleteBlobs(block_root));
|
||||
cold_blobs_db_ops.push(StoreOp::PutBlobs(block_root, Arc::new(blobs)));
|
||||
}
|
||||
}
|
||||
|
||||
// Migrate blobs to freezer.
|
||||
blobs_freezer.do_atomically(store.convert_to_kv_batch(cold_blobs_db_ops)?)?;
|
||||
|
||||
// Warning: Critical section. We have to take care not to put any of the two databases in an
|
||||
// inconsistent state if the OS process dies at any point during the freezing
|
||||
// procedure.
|
||||
@@ -2057,9 +2013,6 @@ pub fn migrate_database<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>(
|
||||
|
||||
// Flush to disk all the states that have just been migrated to the cold store.
|
||||
store.cold_db.sync()?;
|
||||
if let Some(ref cold_blobs_db) = store.cold_blobs_db {
|
||||
cold_blobs_db.sync()?;
|
||||
}
|
||||
|
||||
{
|
||||
let mut split_guard = store.split.write();
|
||||
|
||||
@@ -124,6 +124,8 @@ impl StoreItem for AnchorInfo {
|
||||
pub struct BlobInfo {
|
||||
/// The slot after which blobs are available (>=).
|
||||
pub oldest_blob_slot: Option<Slot>,
|
||||
/// A separate blobs database is in use.
|
||||
pub blobs_db: bool,
|
||||
}
|
||||
|
||||
impl StoreItem for BlobInfo {
|
||||
|
||||
Reference in New Issue
Block a user