mirror of
https://github.com/sigp/lighthouse.git
synced 2026-04-17 04:48:21 +00:00
Merge remote-tracking branch 'origin/unstable' into tree-states
This commit is contained in:
@@ -38,7 +38,7 @@ use std::collections::VecDeque;
|
||||
use std::io::{Read, Write};
|
||||
use std::marker::PhantomData;
|
||||
use std::num::NonZeroUsize;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::path::Path;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use types::blob_sidecar::BlobSidecarList;
|
||||
@@ -69,7 +69,7 @@ pub struct HotColdDB<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> {
|
||||
/// Cold database containing compact historical data.
|
||||
pub cold_db: Cold,
|
||||
/// Database containing blobs. If None, store falls back to use `cold_db`.
|
||||
pub blobs_db: Option<Cold>,
|
||||
pub blobs_db: Cold,
|
||||
/// Hot database containing duplicated but quick-to-access recent data.
|
||||
///
|
||||
/// The hot database also contains all blocks.
|
||||
@@ -211,7 +211,7 @@ impl<E: EthSpec> HotColdDB<E, MemoryStore<E>, MemoryStore<E>> {
|
||||
anchor_info: RwLock::new(None),
|
||||
blob_info: RwLock::new(BlobInfo::default()),
|
||||
cold_db: MemoryStore::open(),
|
||||
blobs_db: Some(MemoryStore::open()),
|
||||
blobs_db: MemoryStore::open(),
|
||||
hot_db: MemoryStore::open(),
|
||||
block_cache: Mutex::new(BlockCache::new(block_cache_size.get())),
|
||||
state_cache: Mutex::new(StateCache::new(state_cache_size)),
|
||||
@@ -237,7 +237,7 @@ impl<E: EthSpec> HotColdDB<E, LevelDB<E>, LevelDB<E>> {
|
||||
pub fn open(
|
||||
hot_path: &Path,
|
||||
cold_path: &Path,
|
||||
blobs_db_path: Option<PathBuf>,
|
||||
blobs_db_path: &Path,
|
||||
migrate_schema: impl FnOnce(Arc<Self>, SchemaVersion, SchemaVersion) -> Result<(), Error>,
|
||||
config: StoreConfig,
|
||||
spec: ChainSpec,
|
||||
@@ -256,12 +256,12 @@ impl<E: EthSpec> HotColdDB<E, LevelDB<E>, LevelDB<E>> {
|
||||
let diff_buffer_cache_size =
|
||||
NonZeroUsize::new(config.diff_buffer_cache_size).ok_or(Error::ZeroCacheSize)?;
|
||||
|
||||
let mut db = HotColdDB {
|
||||
let db = HotColdDB {
|
||||
split: RwLock::new(Split::default()),
|
||||
anchor_info: RwLock::new(None),
|
||||
blob_info: RwLock::new(BlobInfo::default()),
|
||||
cold_db: LevelDB::open(cold_path)?,
|
||||
blobs_db: None,
|
||||
blobs_db: LevelDB::open(blobs_db_path)?,
|
||||
hot_db: LevelDB::open(hot_path)?,
|
||||
block_cache: Mutex::new(BlockCache::new(block_cache_size.get())),
|
||||
state_cache: Mutex::new(StateCache::new(state_cache_size)),
|
||||
@@ -310,37 +310,29 @@ impl<E: EthSpec> HotColdDB<E, LevelDB<E>, LevelDB<E>> {
|
||||
Some(blob_info) => {
|
||||
// If the oldest block slot is already set do not allow the blob DB path to be
|
||||
// changed (require manual migration).
|
||||
if blob_info.oldest_blob_slot.is_some() {
|
||||
if blobs_db_path.is_some() && !blob_info.blobs_db {
|
||||
return Err(HotColdDBError::BlobsPreviouslyInDefaultStore.into());
|
||||
} else if blobs_db_path.is_none() && blob_info.blobs_db {
|
||||
return Err(HotColdDBError::MissingPathToBlobsDatabase.into());
|
||||
}
|
||||
if blob_info.oldest_blob_slot.is_some() && !blob_info.blobs_db {
|
||||
return Err(HotColdDBError::BlobsPreviouslyInDefaultStore.into());
|
||||
}
|
||||
// Set the oldest blob slot to the Deneb fork slot if it is not yet set.
|
||||
// Always initialize `blobs_db` to true, we no longer support storing the blobs
|
||||
// in the freezer DB, because the UX is strictly worse for relocating the DB.
|
||||
let oldest_blob_slot = blob_info.oldest_blob_slot.or(deneb_fork_slot);
|
||||
BlobInfo {
|
||||
oldest_blob_slot,
|
||||
blobs_db: blobs_db_path.is_some(),
|
||||
blobs_db: true,
|
||||
}
|
||||
}
|
||||
// First start.
|
||||
None => BlobInfo {
|
||||
// Set the oldest blob slot to the Deneb fork slot if it is not yet set.
|
||||
oldest_blob_slot: deneb_fork_slot,
|
||||
blobs_db: blobs_db_path.is_some(),
|
||||
blobs_db: true,
|
||||
},
|
||||
};
|
||||
if new_blob_info.blobs_db {
|
||||
if let Some(path) = &blobs_db_path {
|
||||
db.blobs_db = Some(LevelDB::open(path.as_path())?);
|
||||
}
|
||||
}
|
||||
db.compare_and_set_blob_info_with_write(<_>::default(), new_blob_info.clone())?;
|
||||
info!(
|
||||
db.log,
|
||||
"Blob DB initialized";
|
||||
"separate_db" => new_blob_info.blobs_db,
|
||||
"path" => ?blobs_db_path,
|
||||
"oldest_blob_slot" => ?new_blob_info.oldest_blob_slot,
|
||||
);
|
||||
@@ -489,9 +481,8 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
||||
}
|
||||
|
||||
// Load the blinded block.
|
||||
let blinded_block = match self.get_blinded_block(block_root, slot)? {
|
||||
Some(block) => block,
|
||||
None => return Ok(None),
|
||||
let Some(blinded_block) = self.get_blinded_block(block_root, slot)? else {
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
// If the block is after the split point then we should have the full execution payload
|
||||
@@ -614,12 +605,11 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
||||
&self,
|
||||
slot: Slot,
|
||||
) -> Result<Option<SignedBlindedBeaconBlock<E>>, Error> {
|
||||
let bytes = if let Some(bytes) = self.cold_db.get_bytes(
|
||||
let Some(bytes) = self.cold_db.get_bytes(
|
||||
DBColumn::BeaconBlockFrozen.into(),
|
||||
&slot.as_u64().to_be_bytes(),
|
||||
)? {
|
||||
bytes
|
||||
} else {
|
||||
)?
|
||||
else {
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
@@ -748,8 +738,8 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
||||
|
||||
/// Check if the blobs for a block exists on disk.
|
||||
pub fn blobs_exist(&self, block_root: &Hash256) -> Result<bool, Error> {
|
||||
let blobs_db = self.blobs_db.as_ref().unwrap_or(&self.cold_db);
|
||||
blobs_db.key_exists(DBColumn::BeaconBlob.into(), block_root.as_bytes())
|
||||
self.blobs_db
|
||||
.key_exists(DBColumn::BeaconBlob.into(), block_root.as_bytes())
|
||||
}
|
||||
|
||||
/// Determine whether a block exists in the database (hot *or* cold).
|
||||
@@ -769,13 +759,12 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
||||
.key_delete(DBColumn::BeaconBlock.into(), block_root.as_bytes())?;
|
||||
self.hot_db
|
||||
.key_delete(DBColumn::ExecPayload.into(), block_root.as_bytes())?;
|
||||
let blobs_db = self.blobs_db.as_ref().unwrap_or(&self.cold_db);
|
||||
blobs_db.key_delete(DBColumn::BeaconBlob.into(), block_root.as_bytes())
|
||||
self.blobs_db
|
||||
.key_delete(DBColumn::BeaconBlob.into(), block_root.as_bytes())
|
||||
}
|
||||
|
||||
pub fn put_blobs(&self, block_root: &Hash256, blobs: BlobSidecarList<E>) -> Result<(), Error> {
|
||||
let blobs_db = self.blobs_db.as_ref().unwrap_or(&self.cold_db);
|
||||
blobs_db.put_bytes(
|
||||
self.blobs_db.put_bytes(
|
||||
DBColumn::BeaconBlob.into(),
|
||||
block_root.as_bytes(),
|
||||
&blobs.as_ssz_bytes(),
|
||||
@@ -1137,9 +1126,9 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
||||
let mut guard = self.block_cache.lock();
|
||||
|
||||
let blob_cache_ops = blobs_ops.clone();
|
||||
let blobs_db = self.blobs_db.as_ref().unwrap_or(&self.cold_db);
|
||||
// Try to execute blobs store ops.
|
||||
blobs_db.do_atomically(self.convert_to_kv_batch(blobs_ops)?)?;
|
||||
self.blobs_db
|
||||
.do_atomically(self.convert_to_kv_batch(blobs_ops)?)?;
|
||||
|
||||
let hot_db_cache_ops = hot_db_ops.clone();
|
||||
// Try to execute hot db store ops.
|
||||
@@ -1167,7 +1156,8 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
||||
};
|
||||
*op = reverse_op;
|
||||
}
|
||||
blobs_db.do_atomically(self.convert_to_kv_batch(blob_cache_ops)?)?;
|
||||
self.blobs_db
|
||||
.do_atomically(self.convert_to_kv_batch(blob_cache_ops)?)?;
|
||||
return Err(e);
|
||||
}
|
||||
|
||||
@@ -1361,9 +1351,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
||||
return self.load_hot_state_full(state_root).map(Some);
|
||||
}
|
||||
|
||||
let target_summary = if let Some(summary) = self.load_hot_state_summary(state_root)? {
|
||||
summary
|
||||
} else {
|
||||
let Some(target_summary) = self.load_hot_state_summary(state_root)? else {
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
@@ -1997,15 +1985,16 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
||||
|
||||
/// Fetch blobs for a given block from the store.
|
||||
pub fn get_blobs(&self, block_root: &Hash256) -> Result<Option<BlobSidecarList<E>>, Error> {
|
||||
let blobs_db = self.blobs_db.as_ref().unwrap_or(&self.cold_db);
|
||||
|
||||
// Check the cache.
|
||||
if let Some(blobs) = self.block_cache.lock().get_blobs(block_root) {
|
||||
metrics::inc_counter(&metrics::BEACON_BLOBS_CACHE_HIT_COUNT);
|
||||
return Ok(Some(blobs.clone()));
|
||||
}
|
||||
|
||||
match blobs_db.get_bytes(DBColumn::BeaconBlob.into(), block_root.as_bytes())? {
|
||||
match self
|
||||
.blobs_db
|
||||
.get_bytes(DBColumn::BeaconBlob.into(), block_root.as_bytes())?
|
||||
{
|
||||
Some(ref blobs_bytes) => {
|
||||
let blobs = BlobSidecarList::from_ssz_bytes(blobs_bytes)?;
|
||||
self.block_cache
|
||||
@@ -2177,7 +2166,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
||||
});
|
||||
let blob_info = BlobInfo {
|
||||
oldest_blob_slot,
|
||||
blobs_db: self.blobs_db.is_some(),
|
||||
blobs_db: true,
|
||||
};
|
||||
self.compare_and_set_blob_info(self.get_blob_info(), blob_info)
|
||||
}
|
||||
@@ -2562,12 +2551,9 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
||||
|
||||
/// Try to prune blobs, approximating the current epoch from the split slot.
|
||||
pub fn try_prune_most_blobs(&self, force: bool) -> Result<(), Error> {
|
||||
let deneb_fork_epoch = match self.spec.deneb_fork_epoch {
|
||||
Some(epoch) => epoch,
|
||||
None => {
|
||||
debug!(self.log, "Deneb fork is disabled");
|
||||
return Ok(());
|
||||
}
|
||||
let Some(deneb_fork_epoch) = self.spec.deneb_fork_epoch else {
|
||||
debug!(self.log, "Deneb fork is disabled");
|
||||
return Ok(());
|
||||
};
|
||||
// The current epoch is >= split_epoch + 2. It could be greater if the database is
|
||||
// configured to delay updating the split or finalization has ceased. In this instance we
|
||||
@@ -2729,6 +2715,90 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Delete *all* states from the freezer database and update the anchor accordingly.
|
||||
///
|
||||
/// WARNING: this method deletes the genesis state and replaces it with the provided
|
||||
/// `genesis_state`. This is to support its use in schema migrations where the storage scheme of
|
||||
/// the genesis state may be modified. It is the responsibility of the caller to ensure that the
|
||||
/// genesis state is correct, else a corrupt database will be created.
|
||||
pub fn prune_historic_states(
|
||||
&self,
|
||||
genesis_state_root: Hash256,
|
||||
genesis_state: &BeaconState<E>,
|
||||
) -> Result<(), Error> {
|
||||
// Update the anchor to use the dummy state upper limit and disable historic state storage.
|
||||
let old_anchor = self.get_anchor_info();
|
||||
let new_anchor = if let Some(old_anchor) = old_anchor.clone() {
|
||||
AnchorInfo {
|
||||
state_upper_limit: STATE_UPPER_LIMIT_NO_RETAIN,
|
||||
state_lower_limit: Slot::new(0),
|
||||
..old_anchor.clone()
|
||||
}
|
||||
} else {
|
||||
AnchorInfo {
|
||||
anchor_slot: Slot::new(0),
|
||||
oldest_block_slot: Slot::new(0),
|
||||
oldest_block_parent: Hash256::zero(),
|
||||
state_upper_limit: STATE_UPPER_LIMIT_NO_RETAIN,
|
||||
state_lower_limit: Slot::new(0),
|
||||
}
|
||||
};
|
||||
|
||||
// Commit the anchor change immediately: if the cold database ops fail they can always be
|
||||
// retried, and we can't do them atomically with this change anyway.
|
||||
self.compare_and_set_anchor_info_with_write(old_anchor, Some(new_anchor))?;
|
||||
|
||||
// Stage freezer data for deletion. Do not bother loading and deserializing values as this
|
||||
// wastes time and is less schema-agnostic. My hope is that this method will be useful for
|
||||
// migrating to the tree-states schema (delete everything in the freezer then start afresh).
|
||||
let mut cold_ops = vec![];
|
||||
|
||||
let columns = [
|
||||
DBColumn::BeaconState,
|
||||
DBColumn::BeaconStateSummary,
|
||||
DBColumn::BeaconRestorePoint,
|
||||
DBColumn::BeaconStateRoots,
|
||||
DBColumn::BeaconHistoricalRoots,
|
||||
DBColumn::BeaconRandaoMixes,
|
||||
DBColumn::BeaconHistoricalSummaries,
|
||||
];
|
||||
|
||||
for column in columns {
|
||||
for res in self.cold_db.iter_column_keys::<Vec<u8>>(column) {
|
||||
let key = res?;
|
||||
cold_ops.push(KeyValueStoreOp::DeleteKey(get_key_for_col(
|
||||
column.as_str(),
|
||||
&key,
|
||||
)));
|
||||
}
|
||||
}
|
||||
|
||||
// XXX: We need to commit the mass deletion here *before* re-storing the genesis state, as
|
||||
// the current schema performs reads as part of `store_cold_state`. This can be deleted
|
||||
// once the target schema is tree-states. If the process is killed before the genesis state
|
||||
// is written this can be fixed by re-running.
|
||||
info!(
|
||||
self.log,
|
||||
"Deleting historic states";
|
||||
"num_kv" => cold_ops.len(),
|
||||
);
|
||||
self.cold_db.do_atomically(std::mem::take(&mut cold_ops))?;
|
||||
|
||||
// If we just deleted the the genesis state, re-store it using the *current* schema, which
|
||||
// may be different from the schema of the genesis state we just deleted.
|
||||
if self.get_split_slot() > 0 {
|
||||
info!(
|
||||
self.log,
|
||||
"Re-storing genesis state";
|
||||
"state_root" => ?genesis_state_root,
|
||||
);
|
||||
self.store_cold_state(&genesis_state_root, genesis_state, &mut cold_ops)?;
|
||||
self.cold_db.do_atomically(cold_ops)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Advance the split point of the store, moving new finalized states to the freezer.
|
||||
|
||||
@@ -227,9 +227,9 @@ impl<E: EthSpec> KeyValueStore<E> for LevelDB<E> {
|
||||
}
|
||||
|
||||
/// Iterate through all keys and values in a particular column.
|
||||
fn iter_column_keys(&self, column: DBColumn) -> ColumnKeyIter {
|
||||
fn iter_column_keys<K: Key>(&self, column: DBColumn) -> ColumnKeyIter<K> {
|
||||
let start_key =
|
||||
BytesKey::from_vec(get_key_for_col(column.into(), Hash256::zero().as_bytes()));
|
||||
BytesKey::from_vec(get_key_for_col(column.into(), &vec![0; column.key_size()]));
|
||||
|
||||
let iter = self.db.keys_iter(self.read_options());
|
||||
iter.seek(&start_key);
|
||||
@@ -237,13 +237,12 @@ impl<E: EthSpec> KeyValueStore<E> for LevelDB<E> {
|
||||
Box::new(
|
||||
iter.take_while(move |key| key.matches_column(column))
|
||||
.map(move |bytes_key| {
|
||||
let key =
|
||||
bytes_key
|
||||
.remove_column(column)
|
||||
.ok_or(HotColdDBError::IterationError {
|
||||
unexpected_key: bytes_key,
|
||||
})?;
|
||||
Ok(key)
|
||||
let key = bytes_key.remove_column_variable(column).ok_or_else(|| {
|
||||
HotColdDBError::IterationError {
|
||||
unexpected_key: bytes_key.clone(),
|
||||
}
|
||||
})?;
|
||||
K::from_bytes(key)
|
||||
}),
|
||||
)
|
||||
}
|
||||
|
||||
@@ -44,7 +44,7 @@ pub use types::*;
|
||||
pub use validator_pubkey_cache::ValidatorPubkeyCache;
|
||||
|
||||
pub type ColumnIter<'a, K> = Box<dyn Iterator<Item = Result<(K, Vec<u8>), Error>> + 'a>;
|
||||
pub type ColumnKeyIter<'a> = Box<dyn Iterator<Item = Result<Hash256, Error>> + 'a>;
|
||||
pub type ColumnKeyIter<'a, K> = Box<dyn Iterator<Item = Result<K, Error>> + 'a>;
|
||||
|
||||
pub type RawEntryIter<'a> = Box<dyn Iterator<Item = Result<(Vec<u8>, Vec<u8>), Error>> + 'a>;
|
||||
pub type RawKeyIter<'a> = Box<dyn Iterator<Item = Result<Vec<u8>, Error>> + 'a>;
|
||||
@@ -87,6 +87,7 @@ pub trait KeyValueStore<E: EthSpec>: Sync + Send + Sized + 'static {
|
||||
self.iter_column_from(column, &vec![0; column.key_size()])
|
||||
}
|
||||
|
||||
/// Iterate through all keys and values in a column from a given starting point.
|
||||
fn iter_column_from<K: Key>(&self, column: DBColumn, from: &[u8]) -> ColumnIter<K>;
|
||||
|
||||
fn iter_raw_entries(&self, _column: DBColumn, _prefix: &[u8]) -> RawEntryIter {
|
||||
@@ -98,7 +99,7 @@ pub trait KeyValueStore<E: EthSpec>: Sync + Send + Sized + 'static {
|
||||
}
|
||||
|
||||
/// Iterate through all keys in a particular column.
|
||||
fn iter_column_keys(&self, column: DBColumn) -> ColumnKeyIter;
|
||||
fn iter_column_keys<K: Key>(&self, column: DBColumn) -> ColumnKeyIter<K>;
|
||||
}
|
||||
|
||||
pub trait Key: Sized + 'static {
|
||||
@@ -289,7 +290,7 @@ impl DBColumn {
|
||||
/// This function returns the number of bytes used by keys in a given column.
|
||||
pub fn key_size(self) -> usize {
|
||||
match self {
|
||||
Self::OverflowLRUCache => 40,
|
||||
Self::OverflowLRUCache => 33, // See `OverflowKey` encode impl.
|
||||
Self::BeaconMeta
|
||||
| Self::BeaconBlock
|
||||
| Self::BeaconState
|
||||
|
||||
@@ -100,7 +100,7 @@ impl<E: EthSpec> KeyValueStore<E> for MemoryStore<E> {
|
||||
}))
|
||||
}
|
||||
|
||||
fn iter_column_keys(&self, column: DBColumn) -> ColumnKeyIter {
|
||||
fn iter_column_keys<K: Key>(&self, column: DBColumn) -> ColumnKeyIter<K> {
|
||||
Box::new(self.iter_column(column).map(|res| res.map(|(k, _)| k)))
|
||||
}
|
||||
|
||||
|
||||
@@ -135,7 +135,7 @@ pub struct BlobInfo {
|
||||
/// If the `oldest_blob_slot` is `None` then this means that the Deneb fork epoch is not yet
|
||||
/// known.
|
||||
pub oldest_blob_slot: Option<Slot>,
|
||||
/// A separate blobs database is in use.
|
||||
/// A separate blobs database is in use (deprecated, always `true`).
|
||||
pub blobs_db: bool,
|
||||
}
|
||||
|
||||
|
||||
@@ -20,9 +20,7 @@ where
|
||||
self: &Arc<Self>,
|
||||
num_blocks: Option<usize>,
|
||||
) -> Result<(), Error> {
|
||||
let mut anchor = if let Some(anchor) = self.get_anchor_info() {
|
||||
anchor
|
||||
} else {
|
||||
let Some(mut anchor) = self.get_anchor_info() else {
|
||||
// Nothing to do, history is complete.
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
Reference in New Issue
Block a user