mirror of
https://github.com/sigp/lighthouse.git
synced 2026-03-19 12:56:12 +00:00
Fix rebase conflicts
This commit is contained in:
@@ -31,7 +31,7 @@ where
|
||||
"Garbage collecting {} temporary states",
|
||||
delete_ops.len() / 2
|
||||
);
|
||||
self.do_atomically_and_update_cache(delete_ops, None)?;
|
||||
self.do_atomically_with_block_and_blobs_cache(delete_ops)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
|
||||
@@ -544,7 +544,8 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
||||
}
|
||||
|
||||
pub fn put_blobs(&self, block_root: &Hash256, blobs: BlobsSidecar<E>) -> Result<(), Error> {
|
||||
self.hot_db.put_bytes(
|
||||
let blobs_db = self.blobs_db.as_ref().unwrap_or(&self.cold_db);
|
||||
blobs_db.put_bytes(
|
||||
DBColumn::BeaconBlob.into(),
|
||||
block_root.as_bytes(),
|
||||
&blobs.as_ssz_bytes(),
|
||||
@@ -849,19 +850,38 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
||||
Ok(key_value_batch)
|
||||
}
|
||||
|
||||
pub fn do_atomically_and_update_cache(
|
||||
pub fn do_atomically_with_block_and_blobs_cache(
|
||||
&self,
|
||||
batch: Vec<StoreOp<E>>,
|
||||
blobs_batch: Option<Vec<StoreOp<E>>>,
|
||||
) -> Result<(), Error> {
|
||||
// Update the block cache whilst holding a lock, to ensure that the cache updates atomically
|
||||
// with the database.
|
||||
let mut guard = self.block_cache.lock();
|
||||
let mut hot_db_cache_ops = Vec::new();
|
||||
|
||||
for op in &batch {
|
||||
let (blobs_ops, hot_db_ops) = batch.into_iter().partition(|store_op| match store_op {
|
||||
StoreOp::PutBlobs(_, _) | StoreOp::DeleteBlobs(_) => true,
|
||||
StoreOp::PutBlock(_, _) | StoreOp::DeleteBlock(_) => {
|
||||
hot_db_cache_ops.push(store_op.clone());
|
||||
false
|
||||
}
|
||||
_ => false,
|
||||
});
|
||||
|
||||
// Update database whilst holding a lock on cache, to ensure that the cache updates
|
||||
// atomically with the database.
|
||||
let mut guard = self.block_cache.lock();
|
||||
let mut guard_blob = self.blob_cache.lock();
|
||||
|
||||
self.hot_db
|
||||
.do_atomically(self.convert_to_kv_batch(hot_db_ops)?)?;
|
||||
|
||||
let blob_cache_ops = blobs_ops.clone();
|
||||
let blobs_db = self.blobs_db.as_ref().unwrap_or(&self.cold_db);
|
||||
// todo(emhane): do we want to restore the hot db writes if this fails?
|
||||
blobs_db.do_atomically(self.convert_to_kv_batch(blobs_ops)?)?;
|
||||
|
||||
for op in hot_db_cache_ops {
|
||||
match op {
|
||||
StoreOp::PutBlock(block_root, block) => {
|
||||
guard.put(*block_root, (**block).clone());
|
||||
guard.put(block_root, (*block).clone());
|
||||
}
|
||||
|
||||
StoreOp::PutBlobs(_, _) => (),
|
||||
@@ -875,7 +895,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
||||
StoreOp::DeleteStateTemporaryFlag(_) => (),
|
||||
|
||||
StoreOp::DeleteBlock(block_root) => {
|
||||
guard.pop(block_root);
|
||||
guard.pop(&block_root);
|
||||
}
|
||||
|
||||
StoreOp::DeleteBlobs(_) => (),
|
||||
@@ -890,39 +910,22 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
||||
}
|
||||
}
|
||||
|
||||
let guard_blob = match blobs_batch {
|
||||
Some(blob_ops) => {
|
||||
let blobs_db = self.blobs_db.as_ref().unwrap_or(&self.cold_db);
|
||||
// Update the blob cache whilst holding a lock, while holding a lock on the block
|
||||
// cache, to ensure they and their databases all update atomically.
|
||||
let mut guard_blob = self.blob_cache.lock();
|
||||
|
||||
for op in &blob_ops {
|
||||
match op {
|
||||
StoreOp::PutBlobs(block_root, blobs) => {
|
||||
guard_blob.put(*block_root, (**blobs).clone());
|
||||
}
|
||||
|
||||
StoreOp::DeleteBlobs(block_root) => {
|
||||
guard_blob.pop(block_root);
|
||||
}
|
||||
|
||||
_ => (),
|
||||
}
|
||||
for op in blob_cache_ops {
|
||||
match op {
|
||||
StoreOp::PutBlobs(block_root, blobs) => {
|
||||
guard_blob.put(block_root, (*blobs).clone());
|
||||
}
|
||||
blobs_db.do_atomically(self.convert_to_kv_batch(blob_ops)?)?;
|
||||
Some(guard_blob)
|
||||
}
|
||||
None => None,
|
||||
};
|
||||
|
||||
self.hot_db
|
||||
.do_atomically(self.convert_to_kv_batch(batch)?)?;
|
||||
StoreOp::DeleteBlobs(block_root) => {
|
||||
guard_blob.pop(&block_root);
|
||||
}
|
||||
|
||||
_ => (),
|
||||
}
|
||||
}
|
||||
|
||||
drop(guard);
|
||||
if let Some(guard_blob) = guard_blob {
|
||||
drop(guard_blob);
|
||||
}
|
||||
drop(guard_blob);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -1774,7 +1777,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
||||
}
|
||||
}
|
||||
let payloads_pruned = ops.len();
|
||||
self.do_atomically_and_update_cache(ops, None)?;
|
||||
self.do_atomically_with_block_and_blobs_cache(ops)?;
|
||||
info!(
|
||||
self.log,
|
||||
"Execution payload pruning complete";
|
||||
@@ -2073,7 +2076,7 @@ pub fn migrate_database<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>(
|
||||
}
|
||||
|
||||
// Delete the states from the hot database if we got this far.
|
||||
store.do_atomically_and_update_cache(hot_db_ops, None)?;
|
||||
store.do_atomically_with_block_and_blobs_cache(hot_db_ops)?;
|
||||
|
||||
debug!(
|
||||
store.log,
|
||||
|
||||
@@ -154,6 +154,7 @@ pub trait ItemStore<E: EthSpec>: KeyValueStore<E> + Sync + Send + Sized + 'stati
|
||||
|
||||
/// Reified key-value storage operation. Helps in modifying the storage atomically.
|
||||
/// See also https://github.com/sigp/lighthouse/issues/692
|
||||
#[derive(Clone)]
|
||||
pub enum StoreOp<'a, E: EthSpec> {
|
||||
PutBlock(Hash256, Arc<SignedBeaconBlock<E>>),
|
||||
PutState(Hash256, &'a BeaconState<E>),
|
||||
|
||||
Reference in New Issue
Block a user