mirror of
https://github.com/sigp/lighthouse.git
synced 2026-03-21 22:04:44 +00:00
merge upstream, add back get_blobs logic
This commit is contained in:
@@ -31,7 +31,7 @@ where
|
||||
"Garbage collecting {} temporary states",
|
||||
delete_ops.len() / 2
|
||||
);
|
||||
self.do_atomically(delete_ops)?;
|
||||
self.do_atomically_with_block_and_blobs_cache(delete_ops)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
|
||||
@@ -35,7 +35,7 @@ use state_processing::{
|
||||
use std::cmp::min;
|
||||
use std::convert::TryInto;
|
||||
use std::marker::PhantomData;
|
||||
use std::path::Path;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use types::consts::eip4844::MIN_EPOCHS_FOR_BLOBS_SIDECARS_REQUESTS;
|
||||
@@ -59,6 +59,8 @@ pub struct HotColdDB<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> {
|
||||
pub(crate) config: StoreConfig,
|
||||
/// Cold database containing compact historical data.
|
||||
pub cold_db: Cold,
|
||||
/// Database containing blobs. If None, store falls back to use `cold_db`.
|
||||
pub blobs_db: Option<Cold>,
|
||||
/// Hot database containing duplicated but quick-to-access recent data.
|
||||
///
|
||||
/// The hot database also contains all blocks.
|
||||
@@ -98,6 +100,8 @@ pub enum HotColdDBError {
|
||||
MissingExecutionPayload(Hash256),
|
||||
MissingFullBlockExecutionPayloadPruned(Hash256, Slot),
|
||||
MissingAnchorInfo,
|
||||
MissingPathToBlobsDatabase,
|
||||
BlobsPreviouslyInDefaultStore,
|
||||
HotStateSummaryError(BeaconStateError),
|
||||
RestorePointDecodeError(ssz::DecodeError),
|
||||
BlockReplayBeaconError(BeaconStateError),
|
||||
@@ -119,6 +123,7 @@ pub enum HotColdDBError {
|
||||
request_slot: Option<Slot>,
|
||||
state_root: Hash256,
|
||||
},
|
||||
Rollback,
|
||||
}
|
||||
|
||||
impl<E: EthSpec> HotColdDB<E, MemoryStore<E>, MemoryStore<E>> {
|
||||
@@ -134,6 +139,7 @@ impl<E: EthSpec> HotColdDB<E, MemoryStore<E>, MemoryStore<E>> {
|
||||
anchor_info: RwLock::new(None),
|
||||
blob_info: RwLock::new(BlobInfo::default()),
|
||||
cold_db: MemoryStore::open(),
|
||||
blobs_db: Some(MemoryStore::open()),
|
||||
hot_db: MemoryStore::open(),
|
||||
block_cache: Mutex::new(LruCache::new(config.block_cache_size)),
|
||||
blob_cache: Mutex::new(LruCache::new(config.blob_cache_size)),
|
||||
@@ -157,6 +163,7 @@ impl<E: EthSpec> HotColdDB<E, LevelDB<E>, LevelDB<E>> {
|
||||
pub fn open(
|
||||
hot_path: &Path,
|
||||
cold_path: &Path,
|
||||
blobs_db_path: Option<PathBuf>,
|
||||
migrate_schema: impl FnOnce(Arc<Self>, SchemaVersion, SchemaVersion) -> Result<(), Error>,
|
||||
config: StoreConfig,
|
||||
spec: ChainSpec,
|
||||
@@ -169,6 +176,7 @@ impl<E: EthSpec> HotColdDB<E, LevelDB<E>, LevelDB<E>> {
|
||||
anchor_info: RwLock::new(None),
|
||||
blob_info: RwLock::new(BlobInfo::default()),
|
||||
cold_db: LevelDB::open(cold_path)?,
|
||||
blobs_db: None,
|
||||
hot_db: LevelDB::open(hot_path)?,
|
||||
block_cache: Mutex::new(LruCache::new(config.block_cache_size)),
|
||||
blob_cache: Mutex::new(LruCache::new(config.blob_cache_size)),
|
||||
@@ -213,6 +221,53 @@ impl<E: EthSpec> HotColdDB<E, LevelDB<E>, LevelDB<E>> {
|
||||
);
|
||||
}
|
||||
|
||||
// Open separate blobs directory if configured and same configuration was used on previous
|
||||
// run.
|
||||
let blob_info = db.load_blob_info()?;
|
||||
let new_blob_info = {
|
||||
match (&blob_info, &blobs_db_path) {
|
||||
(Some(blob_info), Some(_)) => {
|
||||
if !blob_info.blobs_db {
|
||||
return Err(HotColdDBError::BlobsPreviouslyInDefaultStore.into());
|
||||
}
|
||||
BlobInfo {
|
||||
oldest_blob_slot: blob_info.oldest_blob_slot,
|
||||
blobs_db: true,
|
||||
}
|
||||
}
|
||||
(Some(blob_info), None) => {
|
||||
if blob_info.blobs_db {
|
||||
return Err(HotColdDBError::MissingPathToBlobsDatabase.into());
|
||||
}
|
||||
BlobInfo {
|
||||
oldest_blob_slot: blob_info.oldest_blob_slot,
|
||||
blobs_db: false,
|
||||
}
|
||||
}
|
||||
(None, Some(_)) => BlobInfo {
|
||||
oldest_blob_slot: None,
|
||||
blobs_db: true,
|
||||
}, // first time starting up node
|
||||
(None, None) => BlobInfo {
|
||||
oldest_blob_slot: None,
|
||||
blobs_db: false,
|
||||
}, // first time starting up node
|
||||
}
|
||||
};
|
||||
if new_blob_info.blobs_db {
|
||||
if let Some(path) = &blobs_db_path {
|
||||
db.blobs_db = Some(LevelDB::open(path.as_path())?);
|
||||
}
|
||||
}
|
||||
let blob_info = blob_info.unwrap_or(db.get_blob_info());
|
||||
db.compare_and_set_blob_info_with_write(blob_info, new_blob_info)?;
|
||||
info!(
|
||||
db.log,
|
||||
"Blobs DB initialized";
|
||||
"use separate blobs db" => db.get_blob_info().blobs_db,
|
||||
"path" => ?blobs_db_path
|
||||
);
|
||||
|
||||
// Ensure that the schema version of the on-disk database matches the software.
|
||||
// If the version is mismatched, an automatic migration will be attempted.
|
||||
let db = Arc::new(db);
|
||||
@@ -508,11 +563,14 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
||||
self.hot_db
|
||||
.key_delete(DBColumn::BeaconBlock.into(), block_root.as_bytes())?;
|
||||
self.hot_db
|
||||
.key_delete(DBColumn::ExecPayload.into(), block_root.as_bytes())
|
||||
.key_delete(DBColumn::ExecPayload.into(), block_root.as_bytes())?;
|
||||
let blobs_db = self.blobs_db.as_ref().unwrap_or(&self.cold_db);
|
||||
blobs_db.key_delete(DBColumn::BeaconBlob.into(), block_root.as_bytes())
|
||||
}
|
||||
|
||||
pub fn put_blobs(&self, block_root: &Hash256, blobs: BlobsSidecar<E>) -> Result<(), Error> {
|
||||
self.hot_db.put_bytes(
|
||||
let blobs_db = self.blobs_db.as_ref().unwrap_or(&self.cold_db);
|
||||
blobs_db.put_bytes(
|
||||
DBColumn::BeaconBlob.into(),
|
||||
block_root.as_bytes(),
|
||||
&blobs.as_ssz_bytes(),
|
||||
@@ -521,21 +579,6 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn get_blobs(&self, block_root: &Hash256) -> Result<Option<BlobsSidecar<E>>, Error> {
|
||||
// FIXME(sean) I was attempting to use a blob cache here but was getting deadlocks,
|
||||
// may want to attempt to use one again
|
||||
if let Some(bytes) = self
|
||||
.hot_db
|
||||
.get_bytes(DBColumn::BeaconBlob.into(), block_root.as_bytes())?
|
||||
{
|
||||
let ret = BlobsSidecar::from_ssz_bytes(&bytes)?;
|
||||
self.blob_cache.lock().put(*block_root, ret.clone());
|
||||
Ok(Some(ret))
|
||||
} else {
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
|
||||
pub fn blobs_as_kv_store_ops(
|
||||
&self,
|
||||
key: &Hash256,
|
||||
@@ -832,21 +875,75 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
||||
Ok(key_value_batch)
|
||||
}
|
||||
|
||||
pub fn do_atomically(&self, batch: Vec<StoreOp<E>>) -> Result<(), Error> {
|
||||
// Update the block cache whilst holding a lock, to ensure that the cache updates atomically
|
||||
// with the database.
|
||||
pub fn do_atomically_with_block_and_blobs_cache(
|
||||
&self,
|
||||
batch: Vec<StoreOp<E>>,
|
||||
) -> Result<(), Error> {
|
||||
let mut blobs_to_delete = Vec::new();
|
||||
let (blobs_ops, hot_db_ops): (Vec<StoreOp<E>>, Vec<StoreOp<E>>) =
|
||||
batch.into_iter().partition(|store_op| match store_op {
|
||||
StoreOp::PutBlobs(_, _) => true,
|
||||
StoreOp::DeleteBlobs(block_root) => {
|
||||
match self.get_blobs(block_root) {
|
||||
Ok(Some(blobs_sidecar)) => {
|
||||
blobs_to_delete.push(blobs_sidecar);
|
||||
}
|
||||
Err(e) => {
|
||||
error!(
|
||||
self.log, "Error getting blobs";
|
||||
"block_root" => %block_root,
|
||||
"error" => ?e
|
||||
);
|
||||
}
|
||||
_ => (),
|
||||
}
|
||||
true
|
||||
}
|
||||
StoreOp::PutBlock(_, _) | StoreOp::DeleteBlock(_) => false,
|
||||
_ => false,
|
||||
});
|
||||
|
||||
// Update database whilst holding a lock on cache, to ensure that the cache updates
|
||||
// atomically with the database.
|
||||
let mut guard = self.block_cache.lock();
|
||||
let mut guard_blob = self.blob_cache.lock();
|
||||
|
||||
for op in &batch {
|
||||
let blob_cache_ops = blobs_ops.clone();
|
||||
let blobs_db = self.blobs_db.as_ref().unwrap_or(&self.cold_db);
|
||||
// Try to execute blobs store ops.
|
||||
blobs_db.do_atomically(self.convert_to_kv_batch(blobs_ops)?)?;
|
||||
|
||||
let hot_db_cache_ops = hot_db_ops.clone();
|
||||
// Try to execute hot db store ops.
|
||||
let tx_res = match self.convert_to_kv_batch(hot_db_ops) {
|
||||
Ok(kv_store_ops) => self.hot_db.do_atomically(kv_store_ops),
|
||||
Err(e) => Err(e),
|
||||
};
|
||||
// Rollback on failure
|
||||
if let Err(e) = tx_res {
|
||||
let mut blob_cache_ops = blob_cache_ops;
|
||||
for op in blob_cache_ops.iter_mut() {
|
||||
let reverse_op = match op {
|
||||
StoreOp::PutBlobs(block_root, _) => StoreOp::DeleteBlobs(*block_root),
|
||||
StoreOp::DeleteBlobs(_) => match blobs_to_delete.pop() {
|
||||
Some(blobs) => StoreOp::PutBlobs(blobs.beacon_block_root, Arc::new(blobs)),
|
||||
None => return Err(HotColdDBError::Rollback.into()),
|
||||
},
|
||||
_ => return Err(HotColdDBError::Rollback.into()),
|
||||
};
|
||||
*op = reverse_op;
|
||||
}
|
||||
blobs_db.do_atomically(self.convert_to_kv_batch(blob_cache_ops)?)?;
|
||||
return Err(e);
|
||||
}
|
||||
|
||||
for op in hot_db_cache_ops {
|
||||
match op {
|
||||
StoreOp::PutBlock(block_root, block) => {
|
||||
guard.put(*block_root, (**block).clone());
|
||||
guard.put(block_root, (*block).clone());
|
||||
}
|
||||
|
||||
StoreOp::PutBlobs(block_root, blobs) => {
|
||||
guard_blob.put(*block_root, (**blobs).clone());
|
||||
}
|
||||
StoreOp::PutBlobs(_, _) => (),
|
||||
|
||||
StoreOp::PutState(_, _) => (),
|
||||
|
||||
@@ -857,12 +954,10 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
||||
StoreOp::DeleteStateTemporaryFlag(_) => (),
|
||||
|
||||
StoreOp::DeleteBlock(block_root) => {
|
||||
guard.pop(block_root);
|
||||
guard.pop(&block_root);
|
||||
}
|
||||
|
||||
StoreOp::DeleteBlobs(block_root) => {
|
||||
guard_blob.pop(block_root);
|
||||
}
|
||||
StoreOp::DeleteBlobs(_) => (),
|
||||
|
||||
StoreOp::DeleteState(_, _) => (),
|
||||
|
||||
@@ -874,8 +969,20 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
||||
}
|
||||
}
|
||||
|
||||
self.hot_db
|
||||
.do_atomically(self.convert_to_kv_batch(batch)?)?;
|
||||
for op in blob_cache_ops {
|
||||
match op {
|
||||
StoreOp::PutBlobs(block_root, blobs) => {
|
||||
guard_blob.put(block_root, (*blobs).clone());
|
||||
}
|
||||
|
||||
StoreOp::DeleteBlobs(block_root) => {
|
||||
guard_blob.pop(&block_root);
|
||||
}
|
||||
|
||||
_ => (),
|
||||
}
|
||||
}
|
||||
|
||||
drop(guard);
|
||||
drop(guard_blob);
|
||||
|
||||
@@ -1212,6 +1319,22 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
||||
})
|
||||
}
|
||||
|
||||
/// Fetch a blobs sidecar from the store.
|
||||
pub fn get_blobs(&self, block_root: &Hash256) -> Result<Option<BlobsSidecar<E>>, Error> {
|
||||
let blobs_db = self.blobs_db.as_ref().unwrap_or(&self.cold_db);
|
||||
|
||||
match blobs_db.get_bytes(DBColumn::BeaconBlob.into(), block_root.as_bytes())? {
|
||||
Some(ref blobs_bytes) => {
|
||||
let blobs = BlobsSidecar::from_ssz_bytes(blobs_bytes)?;
|
||||
// FIXME(sean) I was attempting to use a blob cache here but was getting deadlocks,
|
||||
// may want to attempt to use one again
|
||||
self.blob_cache.lock().put(*block_root, blobs.clone());
|
||||
Ok(Some(blobs))
|
||||
}
|
||||
None => Ok(None),
|
||||
}
|
||||
}
|
||||
|
||||
/// Get a reference to the `ChainSpec` used by the database.
|
||||
pub fn get_chain_spec(&self) -> &ChainSpec {
|
||||
&self.spec
|
||||
@@ -1713,7 +1836,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
||||
}
|
||||
}
|
||||
let payloads_pruned = ops.len();
|
||||
self.do_atomically(ops)?;
|
||||
self.do_atomically_with_block_and_blobs_cache(ops)?;
|
||||
info!(
|
||||
self.log,
|
||||
"Execution payload pruning complete";
|
||||
@@ -1862,16 +1985,14 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
||||
}
|
||||
}
|
||||
let blobs_sidecars_pruned = ops.len();
|
||||
|
||||
let update_blob_info = self.compare_and_set_blob_info(
|
||||
blob_info,
|
||||
BlobInfo {
|
||||
oldest_blob_slot: Some(end_slot + 1),
|
||||
},
|
||||
)?;
|
||||
let new_blob_info = BlobInfo {
|
||||
oldest_blob_slot: Some(end_slot + 1),
|
||||
blobs_db: blob_info.blobs_db,
|
||||
};
|
||||
let update_blob_info = self.compare_and_set_blob_info(blob_info, new_blob_info)?;
|
||||
ops.push(StoreOp::KeyValueOp(update_blob_info));
|
||||
|
||||
self.do_atomically(ops)?;
|
||||
self.do_atomically_with_block_and_blobs_cache(ops)?;
|
||||
info!(
|
||||
self.log,
|
||||
"Blobs sidecar pruning complete";
|
||||
@@ -2011,7 +2132,7 @@ pub fn migrate_database<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>(
|
||||
}
|
||||
|
||||
// Delete the states from the hot database if we got this far.
|
||||
store.do_atomically(hot_db_ops)?;
|
||||
store.do_atomically_with_block_and_blobs_cache(hot_db_ops)?;
|
||||
|
||||
debug!(
|
||||
store.log,
|
||||
|
||||
@@ -101,6 +101,7 @@ pub fn get_key_for_col(column: &str, key: &[u8]) -> Vec<u8> {
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
#[derive(Clone)]
|
||||
pub enum KeyValueStoreOp {
|
||||
PutKeyValue(Vec<u8>, Vec<u8>),
|
||||
DeleteKey(Vec<u8>),
|
||||
@@ -154,6 +155,7 @@ pub trait ItemStore<E: EthSpec>: KeyValueStore<E> + Sync + Send + Sized + 'stati
|
||||
|
||||
/// Reified key-value storage operation. Helps in modifying the storage atomically.
|
||||
/// See also https://github.com/sigp/lighthouse/issues/692
|
||||
#[derive(Clone)]
|
||||
pub enum StoreOp<'a, E: EthSpec> {
|
||||
PutBlock(Hash256, Arc<SignedBeaconBlock<E>>),
|
||||
PutState(Hash256, &'a BeaconState<E>),
|
||||
|
||||
@@ -124,6 +124,8 @@ impl StoreItem for AnchorInfo {
|
||||
pub struct BlobInfo {
|
||||
/// The slot after which blobs are available (>=).
|
||||
pub oldest_blob_slot: Option<Slot>,
|
||||
/// A separate blobs database is in use.
|
||||
pub blobs_db: bool,
|
||||
}
|
||||
|
||||
impl StoreItem for BlobInfo {
|
||||
|
||||
Reference in New Issue
Block a user