mirror of
https://github.com/sigp/lighthouse.git
synced 2026-04-28 18:23:38 +00:00
Merge remote-tracking branch 'origin/deneb-free-blobs' into tree-states
This commit is contained in:
@@ -15,6 +15,8 @@ pub const DEFAULT_COMPRESSION_LEVEL: i32 = 1;
|
||||
pub const DEFAULT_DIFF_BUFFER_CACHE_SIZE: usize = 16;
|
||||
const EST_COMPRESSION_FACTOR: usize = 2;
|
||||
pub const DEFAULT_HISTORIC_STATE_CACHE_SIZE: usize = 1;
|
||||
pub const DEFAULT_EPOCHS_PER_BLOB_PRUNE: u64 = 1;
|
||||
pub const DEFAULT_BLOB_PUNE_MARGIN_EPOCHS: u64 = 0;
|
||||
|
||||
/// Database configuration parameters.
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||
@@ -43,6 +45,13 @@ pub struct StoreConfig {
|
||||
pub linear_restore_points: bool,
|
||||
/// State diff hierarchy.
|
||||
pub hierarchy_config: HierarchyConfig,
|
||||
/// Whether to prune blobs older than the blob data availability boundary.
|
||||
pub prune_blobs: bool,
|
||||
/// Frequency of blob pruning in epochs. Default: 1 (every epoch).
|
||||
pub epochs_per_blob_prune: u64,
|
||||
/// The margin for blob pruning in epochs. The oldest blobs are pruned up until
|
||||
/// data_availability_boundary - blob_prune_margin_epochs. Default: 0.
|
||||
pub blob_prune_margin_epochs: u64,
|
||||
}
|
||||
|
||||
/// Variant of `StoreConfig` that gets written to disk. Contains immutable configuration params.
|
||||
@@ -70,6 +79,7 @@ pub enum StoreConfigError {
|
||||
epochs_per_state_diff: u64,
|
||||
max_supported: u64,
|
||||
},
|
||||
ZeroEpochsPerBlobPrune,
|
||||
}
|
||||
|
||||
impl Default for StoreConfig {
|
||||
@@ -87,6 +97,9 @@ impl Default for StoreConfig {
|
||||
linear_blocks: true,
|
||||
linear_restore_points: true,
|
||||
hierarchy_config: HierarchyConfig::default(),
|
||||
prune_blobs: true,
|
||||
epochs_per_blob_prune: DEFAULT_EPOCHS_PER_BLOB_PRUNE,
|
||||
blob_prune_margin_epochs: DEFAULT_BLOB_PUNE_MARGIN_EPOCHS,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -116,6 +129,7 @@ impl StoreConfig {
|
||||
/// Check that the configuration is valid.
|
||||
pub fn verify<E: EthSpec>(&self) -> Result<(), StoreConfigError> {
|
||||
self.verify_compression_level()?;
|
||||
self.verify_epochs_per_blob_prune()?;
|
||||
self.verify_epochs_per_state_diff::<E>()
|
||||
}
|
||||
|
||||
@@ -145,6 +159,19 @@ impl StoreConfig {
|
||||
}
|
||||
}
|
||||
|
||||
/// Check that epochs_per_blob_prune is at least 1 epoch to avoid attempting to prune the same
|
||||
/// epochs over and over again.
|
||||
fn verify_epochs_per_blob_prune(
|
||||
&self,
|
||||
epochs_per_blob_prune: u64,
|
||||
) -> Result<(), StoreConfigError> {
|
||||
if self.epochs_per_blob_prune > 0 {
|
||||
Ok(())
|
||||
} else {
|
||||
Err(StoreConfigError::ZeroEpochsPerBlobPrune)
|
||||
}
|
||||
}
|
||||
|
||||
/// Estimate the size of `len` bytes after compression at the current compression level.
|
||||
pub fn estimate_compressed_size(&self, len: usize) -> usize {
|
||||
if self.compression_level == 0 {
|
||||
|
||||
@@ -24,6 +24,8 @@ pub enum Error {
|
||||
SchemaMigrationError(String),
|
||||
/// The store's `anchor_info` was mutated concurrently, the latest modification wasn't applied.
|
||||
AnchorInfoConcurrentMutation,
|
||||
/// The store's `blob_info` was mutated concurrently, the latest modification wasn't applied.
|
||||
BlobInfoConcurrentMutation,
|
||||
/// The block or state is unavailable due to weak subjectivity sync.
|
||||
HistoryUnavailable,
|
||||
/// State reconstruction cannot commence because not all historic blocks are known.
|
||||
|
||||
@@ -202,7 +202,7 @@ impl<'a, E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>
|
||||
column: DBColumn,
|
||||
start_slot: Slot,
|
||||
end_slot: Option<Slot>,
|
||||
get_state: impl FnOnce() -> (BeaconState<E>, Hash256),
|
||||
get_state: impl FnOnce() -> Result<(BeaconState<E>, Hash256)>,
|
||||
) -> Result<Self> {
|
||||
use HybridForwardsIterator::*;
|
||||
|
||||
@@ -225,7 +225,7 @@ impl<'a, E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>
|
||||
if end_slot.map_or(false, |end_slot| end_slot < freezer_upper_limit) {
|
||||
None
|
||||
} else {
|
||||
Some(Box::new(get_state()))
|
||||
Some(Box::new(get_state()?))
|
||||
};
|
||||
PreFinalization {
|
||||
iter,
|
||||
@@ -236,7 +236,7 @@ impl<'a, E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>
|
||||
}
|
||||
} else {
|
||||
PostFinalizationLazy {
|
||||
continuation_data: Some(Box::new(get_state())),
|
||||
continuation_data: Some(Box::new(get_state()?)),
|
||||
store,
|
||||
start_slot,
|
||||
column,
|
||||
|
||||
@@ -31,7 +31,7 @@ where
|
||||
"Garbage collecting {} temporary states",
|
||||
delete_ops.len() / 2
|
||||
);
|
||||
self.do_atomically(delete_ops)?;
|
||||
self.do_atomically_with_block_and_blobs_cache(delete_ops)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
|
||||
@@ -10,9 +10,9 @@ use crate::iter::{BlockRootsIterator, ParentRootBlockIterator, RootsIterator};
|
||||
use crate::leveldb_store::{BytesKey, LevelDB};
|
||||
use crate::memory_store::MemoryStore;
|
||||
use crate::metadata::{
|
||||
AnchorInfo, CompactionTimestamp, PruningCheckpoint, SchemaVersion, ANCHOR_INFO_KEY,
|
||||
COMPACTION_TIMESTAMP_KEY, CONFIG_KEY, CURRENT_SCHEMA_VERSION, PRUNING_CHECKPOINT_KEY,
|
||||
SCHEMA_VERSION_KEY, SPLIT_KEY, STATE_UPPER_LIMIT_NO_RETAIN,
|
||||
AnchorInfo, BlobInfo, CompactionTimestamp, PruningCheckpoint, SchemaVersion, ANCHOR_INFO_KEY,
|
||||
BLOB_INFO_KEY, COMPACTION_TIMESTAMP_KEY, CONFIG_KEY, CURRENT_SCHEMA_VERSION,
|
||||
PRUNING_CHECKPOINT_KEY, SCHEMA_VERSION_KEY, SPLIT_KEY, STATE_UPPER_LIMIT_NO_RETAIN,
|
||||
};
|
||||
use crate::metrics;
|
||||
use crate::state_cache::{PutStateOutcome, StateCache};
|
||||
@@ -38,9 +38,11 @@ use std::collections::VecDeque;
|
||||
use std::io::{Read, Write};
|
||||
use std::marker::PhantomData;
|
||||
use std::num::NonZeroUsize;
|
||||
use std::path::Path;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use types::blob_sidecar::BlobSidecarList;
|
||||
use types::consts::deneb::MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS;
|
||||
use types::EthSpec;
|
||||
use types::*;
|
||||
use zstd::{Decoder, Encoder};
|
||||
@@ -60,16 +62,20 @@ pub struct HotColdDB<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> {
|
||||
pub(crate) split: RwLock<Split>,
|
||||
/// The starting slots for the range of blocks & states stored in the database.
|
||||
anchor_info: RwLock<Option<AnchorInfo>>,
|
||||
/// The starting slots for the range of blobs stored in the database.
|
||||
blob_info: RwLock<BlobInfo>,
|
||||
pub(crate) config: StoreConfig,
|
||||
pub(crate) hierarchy: HierarchyModuli,
|
||||
/// Cold database containing compact historical data.
|
||||
pub cold_db: Cold,
|
||||
/// Database containing blobs. If None, store falls back to use `cold_db`.
|
||||
pub blobs_db: Option<Cold>,
|
||||
/// Hot database containing duplicated but quick-to-access recent data.
|
||||
///
|
||||
/// The hot database also contains all blocks.
|
||||
pub hot_db: Hot,
|
||||
/// LRU cache of deserialized blocks. Updated whenever a block is loaded.
|
||||
block_cache: Mutex<LruCache<Hash256, SignedBeaconBlock<E>>>,
|
||||
/// LRU cache of deserialized blocks and blobs. Updated whenever a block or blob is loaded.
|
||||
block_cache: Mutex<BlockCache<E>>,
|
||||
/// Cache of beacon states.
|
||||
state_cache: Mutex<StateCache<E>>,
|
||||
/// Immutable validator cache.
|
||||
@@ -80,8 +86,6 @@ pub struct HotColdDB<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> {
|
||||
historic_state_cache: Mutex<LruCache<Slot, BeaconState<E>>>,
|
||||
/// Cache of hierarchical diff buffers.
|
||||
diff_buffer_cache: Mutex<LruCache<Slot, HDiffBuffer>>,
|
||||
// Cache of hierarchical diffs.
|
||||
// FIXME(sproul): see if this is necessary
|
||||
/// Chain spec.
|
||||
pub(crate) spec: ChainSpec,
|
||||
/// Logger.
|
||||
@@ -90,6 +94,43 @@ pub struct HotColdDB<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> {
|
||||
_phantom: PhantomData<E>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct BlockCache<E: EthSpec> {
|
||||
block_cache: LruCache<Hash256, SignedBeaconBlock<E>>,
|
||||
blob_cache: LruCache<Hash256, BlobSidecarList<E>>,
|
||||
}
|
||||
|
||||
impl<E: EthSpec> BlockCache<E> {
|
||||
pub fn new(size: usize) -> Self {
|
||||
Self {
|
||||
block_cache: LruCache::new(size),
|
||||
blob_cache: LruCache::new(size),
|
||||
}
|
||||
}
|
||||
pub fn put_block(&mut self, block_root: Hash256, block: SignedBeaconBlock<E>) {
|
||||
self.block_cache.put(block_root, block);
|
||||
}
|
||||
pub fn put_blobs(&mut self, block_root: Hash256, blobs: BlobSidecarList<E>) {
|
||||
self.blob_cache.put(block_root, blobs);
|
||||
}
|
||||
pub fn get_block<'a>(&'a mut self, block_root: &Hash256) -> Option<&'a SignedBeaconBlock<E>> {
|
||||
self.block_cache.get(block_root)
|
||||
}
|
||||
pub fn get_blobs<'a>(&'a mut self, block_root: &Hash256) -> Option<&'a BlobSidecarList<E>> {
|
||||
self.blob_cache.get(block_root)
|
||||
}
|
||||
pub fn delete_block(&mut self, block_root: &Hash256) {
|
||||
let _ = self.block_cache.pop(block_root);
|
||||
}
|
||||
pub fn delete_blobs(&mut self, block_root: &Hash256) {
|
||||
let _ = self.blob_cache.pop(block_root);
|
||||
}
|
||||
pub fn delete(&mut self, block_root: &Hash256) {
|
||||
let _ = self.block_cache.pop(block_root);
|
||||
let _ = self.blob_cache.pop(block_root);
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq)]
|
||||
pub enum HotColdDBError {
|
||||
UnsupportedSchemaVersion {
|
||||
@@ -119,12 +160,21 @@ pub enum HotColdDBError {
|
||||
MissingAnchorInfo,
|
||||
MissingFrozenBlockSlot(Hash256),
|
||||
MissingFrozenBlock(Slot),
|
||||
MissingPathToBlobsDatabase,
|
||||
BlobsPreviouslyInDefaultStore,
|
||||
HotStateSummaryError(BeaconStateError),
|
||||
RestorePointDecodeError(ssz::DecodeError),
|
||||
BlockReplayBeaconError(BeaconStateError),
|
||||
BlockReplaySlotError(SlotProcessingError),
|
||||
BlockReplayBlockError(BlockProcessingError),
|
||||
MissingLowerLimitState(Slot),
|
||||
InvalidSlotsPerRestorePoint {
|
||||
slots_per_restore_point: u64,
|
||||
slots_per_historical_root: u64,
|
||||
slots_per_epoch: u64,
|
||||
},
|
||||
ZeroEpochsPerBlobPrune,
|
||||
BlobPruneLogicError,
|
||||
RestorePointBlockHashError(BeaconStateError),
|
||||
IterationError {
|
||||
unexpected_key: BytesKey,
|
||||
@@ -134,6 +184,7 @@ pub enum HotColdDBError {
|
||||
request_slot: Slot,
|
||||
block_root: Hash256,
|
||||
},
|
||||
Rollback,
|
||||
}
|
||||
|
||||
impl<E: EthSpec> HotColdDB<E, MemoryStore<E>, MemoryStore<E>> {
|
||||
@@ -158,9 +209,11 @@ impl<E: EthSpec> HotColdDB<E, MemoryStore<E>, MemoryStore<E>> {
|
||||
let db = HotColdDB {
|
||||
split: RwLock::new(Split::default()),
|
||||
anchor_info: RwLock::new(None),
|
||||
blob_info: RwLock::new(BlobInfo::default()),
|
||||
cold_db: MemoryStore::open(),
|
||||
blobs_db: Some(MemoryStore::open()),
|
||||
hot_db: MemoryStore::open(),
|
||||
block_cache: Mutex::new(LruCache::new(block_cache_size.get())),
|
||||
block_cache: Mutex::new(BlockCache::new(config.block_cache_size.get())),
|
||||
state_cache: Mutex::new(StateCache::new(state_cache_size)),
|
||||
immutable_validators: Arc::new(RwLock::new(Default::default())),
|
||||
historic_state_cache: Mutex::new(LruCache::new(historic_state_cache_size.get())),
|
||||
@@ -184,6 +237,7 @@ impl<E: EthSpec> HotColdDB<E, LevelDB<E>, LevelDB<E>> {
|
||||
pub fn open(
|
||||
hot_path: &Path,
|
||||
cold_path: &Path,
|
||||
blobs_db_path: Option<PathBuf>,
|
||||
migrate_schema: impl FnOnce(Arc<Self>, SchemaVersion, SchemaVersion) -> Result<(), Error>,
|
||||
config: StoreConfig,
|
||||
spec: ChainSpec,
|
||||
@@ -205,9 +259,11 @@ impl<E: EthSpec> HotColdDB<E, LevelDB<E>, LevelDB<E>> {
|
||||
let db = HotColdDB {
|
||||
split: RwLock::new(Split::default()),
|
||||
anchor_info: RwLock::new(None),
|
||||
blob_info: RwLock::new(BlobInfo::default()),
|
||||
cold_db: LevelDB::open(cold_path)?,
|
||||
blobs_db: None,
|
||||
hot_db: LevelDB::open(hot_path)?,
|
||||
block_cache: Mutex::new(LruCache::new(block_cache_size.get())),
|
||||
block_cache: Mutex::new(BlockCache::new(config.block_cache_size.get())),
|
||||
state_cache: Mutex::new(StateCache::new(state_cache_size)),
|
||||
immutable_validators: Arc::new(RwLock::new(Default::default())),
|
||||
historic_state_cache: Mutex::new(LruCache::new(historic_state_cache_size.get())),
|
||||
@@ -243,6 +299,52 @@ impl<E: EthSpec> HotColdDB<E, LevelDB<E>, LevelDB<E>> {
|
||||
let pubkey_cache = ValidatorPubkeyCache::load_from_store(&db)?;
|
||||
*db.immutable_validators.write() = pubkey_cache;
|
||||
|
||||
// Open separate blobs directory if configured and same configuration was used on previous
|
||||
// run.
|
||||
let blob_info = db.load_blob_info()?;
|
||||
let deneb_fork_slot = db
|
||||
.spec
|
||||
.deneb_fork_epoch
|
||||
.map(|epoch| epoch.start_slot(E::slots_per_epoch()));
|
||||
let new_blob_info = match &blob_info {
|
||||
Some(blob_info) => {
|
||||
// If the oldest block slot is already set do not allow the blob DB path to be
|
||||
// changed (require manual migration).
|
||||
if blob_info.oldest_blob_slot.is_some() {
|
||||
if blobs_db_path.is_some() && !blob_info.blobs_db {
|
||||
return Err(HotColdDBError::BlobsPreviouslyInDefaultStore.into());
|
||||
} else if blobs_db_path.is_none() && blob_info.blobs_db {
|
||||
return Err(HotColdDBError::MissingPathToBlobsDatabase.into());
|
||||
}
|
||||
}
|
||||
// Set the oldest blob slot to the Deneb fork slot if it is not yet set.
|
||||
let oldest_blob_slot = blob_info.oldest_blob_slot.or(deneb_fork_slot);
|
||||
BlobInfo {
|
||||
oldest_blob_slot,
|
||||
blobs_db: blobs_db_path.is_some(),
|
||||
}
|
||||
}
|
||||
// First start.
|
||||
None => BlobInfo {
|
||||
// Set the oldest blob slot to the Deneb fork slot if it is not yet set.
|
||||
oldest_blob_slot: deneb_fork_slot,
|
||||
blobs_db: blobs_db_path.is_some(),
|
||||
},
|
||||
};
|
||||
if new_blob_info.blobs_db {
|
||||
if let Some(path) = &blobs_db_path {
|
||||
db.blobs_db = Some(LevelDB::open(path.as_path())?);
|
||||
}
|
||||
}
|
||||
db.compare_and_set_blob_info_with_write(<_>::default(), new_blob_info.clone())?;
|
||||
info!(
|
||||
db.log,
|
||||
"Blob DB initialized";
|
||||
"separate_db" => new_blob_info.blobs_db,
|
||||
"path" => ?blobs_db_path,
|
||||
"oldest_blob_slot" => ?new_blob_info.oldest_blob_slot,
|
||||
);
|
||||
|
||||
// Ensure that the schema version of the on-disk database matches the software.
|
||||
// If the version is mismatched, an automatic migration will be attempted.
|
||||
let db = Arc::new(db);
|
||||
@@ -328,7 +430,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
||||
self.hot_db.do_atomically(ops)?;
|
||||
|
||||
// Update cache.
|
||||
self.block_cache.lock().put(*block_root, block);
|
||||
self.block_cache.lock().put_block(*block_root, block);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -381,7 +483,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
||||
metrics::inc_counter(&metrics::BEACON_BLOCK_GET_COUNT);
|
||||
|
||||
// Check the cache.
|
||||
if let Some(block) = self.block_cache.lock().get(block_root) {
|
||||
if let Some(block) = self.block_cache.lock().get_block(block_root) {
|
||||
metrics::inc_counter(&metrics::BEACON_BLOCK_CACHE_HIT_COUNT);
|
||||
return Ok(Some(DatabaseBlock::Full(block.clone())));
|
||||
}
|
||||
@@ -406,7 +508,9 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
||||
let full_block = self.make_full_block(block_root, blinded_block)?;
|
||||
|
||||
// Add to cache.
|
||||
self.block_cache.lock().put(*block_root, full_block.clone());
|
||||
self.block_cache
|
||||
.lock()
|
||||
.put_block(*block_root, full_block.clone());
|
||||
|
||||
DatabaseBlock::Full(full_block)
|
||||
} else if !self.config.prune_payloads {
|
||||
@@ -642,6 +746,12 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
||||
.do_atomically(vec![execution_payload.as_kv_store_op(*block_root)?])
|
||||
}
|
||||
|
||||
/// Check if the blobs for a block exists on disk.
|
||||
pub fn blobs_exist(&self, block_root: &Hash256) -> Result<bool, Error> {
|
||||
let blobs_db = self.blobs_db.as_ref().unwrap_or(&self.cold_db);
|
||||
blobs_db.key_exists(DBColumn::BeaconBlob.into(), block_root.as_bytes())
|
||||
}
|
||||
|
||||
/// Determine whether a block exists in the database (hot *or* cold).
|
||||
pub fn block_exists(&self, block_root: &Hash256) -> Result<bool, Error> {
|
||||
Ok(self
|
||||
@@ -654,11 +764,34 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
||||
|
||||
/// Delete a block from the store and the block cache.
|
||||
pub fn delete_block(&self, block_root: &Hash256) -> Result<(), Error> {
|
||||
self.block_cache.lock().pop(block_root);
|
||||
self.block_cache.lock().delete(block_root);
|
||||
self.hot_db
|
||||
.key_delete(DBColumn::BeaconBlock.into(), block_root.as_bytes())?;
|
||||
self.hot_db
|
||||
.key_delete(DBColumn::ExecPayload.into(), block_root.as_bytes())
|
||||
.key_delete(DBColumn::ExecPayload.into(), block_root.as_bytes())?;
|
||||
let blobs_db = self.blobs_db.as_ref().unwrap_or(&self.cold_db);
|
||||
blobs_db.key_delete(DBColumn::BeaconBlob.into(), block_root.as_bytes())
|
||||
}
|
||||
|
||||
pub fn put_blobs(&self, block_root: &Hash256, blobs: BlobSidecarList<E>) -> Result<(), Error> {
|
||||
let blobs_db = self.blobs_db.as_ref().unwrap_or(&self.cold_db);
|
||||
blobs_db.put_bytes(
|
||||
DBColumn::BeaconBlob.into(),
|
||||
block_root.as_bytes(),
|
||||
&blobs.as_ssz_bytes(),
|
||||
)?;
|
||||
self.block_cache.lock().put_blobs(*block_root, blobs);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn blobs_as_kv_store_ops(
|
||||
&self,
|
||||
key: &Hash256,
|
||||
blobs: BlobSidecarList<E>,
|
||||
ops: &mut Vec<KeyValueStoreOp>,
|
||||
) {
|
||||
let db_key = get_key_for_col(DBColumn::BeaconBlob.into(), key.as_bytes());
|
||||
ops.push(KeyValueStoreOp::PutKeyValue(db_key, blobs.as_ssz_bytes()));
|
||||
}
|
||||
|
||||
pub fn put_state_summary(
|
||||
@@ -831,7 +964,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
||||
DBColumn::BeaconBlockRoots,
|
||||
start_slot,
|
||||
None,
|
||||
|| (end_state, end_block_root),
|
||||
|| Ok((end_state, end_block_root)),
|
||||
)
|
||||
}
|
||||
|
||||
@@ -839,7 +972,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
||||
&self,
|
||||
start_slot: Slot,
|
||||
end_slot: Slot,
|
||||
get_state: impl FnOnce() -> (BeaconState<E>, Hash256),
|
||||
get_state: impl FnOnce() -> Result<(BeaconState<E>, Hash256), Error>,
|
||||
) -> Result<HybridForwardsBlockRootsIterator<E, Hot, Cold>, Error> {
|
||||
HybridForwardsBlockRootsIterator::new(
|
||||
self,
|
||||
@@ -861,7 +994,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
||||
DBColumn::BeaconStateRoots,
|
||||
start_slot,
|
||||
None,
|
||||
|| (end_state, end_state_root),
|
||||
|| Ok((end_state, end_state_root)),
|
||||
)
|
||||
}
|
||||
|
||||
@@ -869,7 +1002,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
||||
&self,
|
||||
start_slot: Slot,
|
||||
end_slot: Slot,
|
||||
get_state: impl FnOnce() -> (BeaconState<E>, Hash256),
|
||||
get_state: impl FnOnce() -> Result<(BeaconState<E>, Hash256), Error>,
|
||||
) -> Result<HybridForwardsStateRootsIterator<E, Hot, Cold>, Error> {
|
||||
HybridForwardsStateRootsIterator::new(
|
||||
self,
|
||||
@@ -908,6 +1041,10 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
||||
)?;
|
||||
}
|
||||
|
||||
StoreOp::PutBlobs(block_root, blobs) => {
|
||||
self.blobs_as_kv_store_ops(&block_root, blobs, &mut key_value_batch);
|
||||
}
|
||||
|
||||
StoreOp::PutState(state_root, state) => {
|
||||
self.store_hot_state(&state_root, state, &mut key_value_batch)?;
|
||||
}
|
||||
@@ -927,6 +1064,11 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
||||
key_value_batch.push(KeyValueStoreOp::DeleteKey(key));
|
||||
}
|
||||
|
||||
StoreOp::DeleteBlobs(block_root) => {
|
||||
let key = get_key_for_col(DBColumn::BeaconBlob.into(), block_root.as_bytes());
|
||||
key_value_batch.push(KeyValueStoreOp::DeleteKey(key));
|
||||
}
|
||||
|
||||
StoreOp::DeleteState(state_root, slot) => {
|
||||
let state_summary_key =
|
||||
get_key_for_col(DBColumn::BeaconStateSummary.into(), state_root.as_bytes());
|
||||
@@ -959,17 +1101,81 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
||||
Ok(key_value_batch)
|
||||
}
|
||||
|
||||
pub fn do_atomically(&self, batch: Vec<StoreOp<E>>) -> Result<(), Error> {
|
||||
// Update the block cache whilst holding a lock, to ensure that the cache updates atomically
|
||||
// with the database.
|
||||
let mut block_cache = self.block_cache.lock();
|
||||
pub fn do_atomically_with_block_and_blobs_cache(
|
||||
&self,
|
||||
batch: Vec<StoreOp<E>>,
|
||||
) -> Result<(), Error> {
|
||||
let mut blobs_to_delete = Vec::new();
|
||||
let (blobs_ops, hot_db_ops): (Vec<StoreOp<E>>, Vec<StoreOp<E>>) =
|
||||
batch.into_iter().partition(|store_op| match store_op {
|
||||
StoreOp::PutBlobs(_, _) => true,
|
||||
StoreOp::DeleteBlobs(block_root) => {
|
||||
match self.get_blobs(block_root) {
|
||||
Ok(Some(blob_sidecar_list)) => {
|
||||
blobs_to_delete.push((*block_root, blob_sidecar_list));
|
||||
}
|
||||
Err(e) => {
|
||||
error!(
|
||||
self.log, "Error getting blobs";
|
||||
"block_root" => %block_root,
|
||||
"error" => ?e
|
||||
);
|
||||
}
|
||||
_ => (),
|
||||
}
|
||||
true
|
||||
}
|
||||
StoreOp::PutBlock(_, _) | StoreOp::DeleteBlock(_) => false,
|
||||
_ => false,
|
||||
});
|
||||
|
||||
for op in &batch {
|
||||
// Update database whilst holding a lock on cache, to ensure that the cache updates
|
||||
// atomically with the database.
|
||||
let mut guard = self.block_cache.lock();
|
||||
|
||||
let blob_cache_ops = blobs_ops.clone();
|
||||
let blobs_db = self.blobs_db.as_ref().unwrap_or(&self.cold_db);
|
||||
// Try to execute blobs store ops.
|
||||
blobs_db.do_atomically(self.convert_to_kv_batch(blobs_ops)?)?;
|
||||
|
||||
let hot_db_cache_ops = hot_db_ops.clone();
|
||||
// Try to execute hot db store ops.
|
||||
let tx_res = match self.convert_to_kv_batch(hot_db_ops) {
|
||||
Ok(kv_store_ops) => self.hot_db.do_atomically(kv_store_ops),
|
||||
Err(e) => Err(e),
|
||||
};
|
||||
// Rollback on failure
|
||||
if let Err(e) = tx_res {
|
||||
error!(
|
||||
self.log,
|
||||
"Database write failed";
|
||||
"error" => ?e,
|
||||
"action" => "reverting blob DB changes"
|
||||
);
|
||||
let mut blob_cache_ops = blob_cache_ops;
|
||||
for op in blob_cache_ops.iter_mut() {
|
||||
let reverse_op = match op {
|
||||
StoreOp::PutBlobs(block_root, _) => StoreOp::DeleteBlobs(*block_root),
|
||||
StoreOp::DeleteBlobs(_) => match blobs_to_delete.pop() {
|
||||
Some((block_root, blobs)) => StoreOp::PutBlobs(block_root, blobs),
|
||||
None => return Err(HotColdDBError::Rollback.into()),
|
||||
},
|
||||
_ => return Err(HotColdDBError::Rollback.into()),
|
||||
};
|
||||
*op = reverse_op;
|
||||
}
|
||||
blobs_db.do_atomically(self.convert_to_kv_batch(blob_cache_ops)?)?;
|
||||
return Err(e);
|
||||
}
|
||||
|
||||
for op in hot_db_cache_ops {
|
||||
match op {
|
||||
StoreOp::PutBlock(block_root, block) => {
|
||||
block_cache.put(*block_root, (**block).clone());
|
||||
guard.put_block(block_root, (*block).clone());
|
||||
}
|
||||
|
||||
StoreOp::PutBlobs(_, _) => (),
|
||||
|
||||
StoreOp::PutState(_, _) => (),
|
||||
|
||||
StoreOp::PutStateTemporaryFlag(_) => (),
|
||||
@@ -977,7 +1183,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
||||
StoreOp::DeleteStateTemporaryFlag(_) => (),
|
||||
|
||||
StoreOp::DeleteBlock(block_root) => {
|
||||
block_cache.pop(block_root);
|
||||
guard.delete_block(&block_root);
|
||||
self.state_cache.lock().delete_block_states(block_root);
|
||||
}
|
||||
|
||||
@@ -985,6 +1191,10 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
||||
self.state_cache.lock().delete_state(state_root)
|
||||
}
|
||||
|
||||
StoreOp::DeleteBlobs(_) => (),
|
||||
|
||||
StoreOp::DeleteState(_, _) => (),
|
||||
|
||||
StoreOp::DeleteExecutionPayload(_) => (),
|
||||
|
||||
StoreOp::KeyValueOp(_) => (),
|
||||
@@ -993,7 +1203,22 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
||||
|
||||
self.hot_db
|
||||
.do_atomically(self.convert_to_kv_batch(batch)?)?;
|
||||
drop(block_cache);
|
||||
|
||||
for op in blob_cache_ops {
|
||||
match op {
|
||||
StoreOp::PutBlobs(block_root, blobs) => {
|
||||
guard.put_blobs(block_root, blobs);
|
||||
}
|
||||
|
||||
StoreOp::DeleteBlobs(block_root) => {
|
||||
guard.delete_blobs(&block_root);
|
||||
}
|
||||
|
||||
_ => (),
|
||||
}
|
||||
}
|
||||
|
||||
drop(guard);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -1772,6 +1997,28 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
||||
})
|
||||
}
|
||||
|
||||
/// Fetch blobs for a given block from the store.
|
||||
pub fn get_blobs(&self, block_root: &Hash256) -> Result<Option<BlobSidecarList<E>>, Error> {
|
||||
let blobs_db = self.blobs_db.as_ref().unwrap_or(&self.cold_db);
|
||||
|
||||
// Check the cache.
|
||||
if let Some(blobs) = self.block_cache.lock().get_blobs(block_root) {
|
||||
metrics::inc_counter(&metrics::BEACON_BLOBS_CACHE_HIT_COUNT);
|
||||
return Ok(Some(blobs.clone()));
|
||||
}
|
||||
|
||||
match blobs_db.get_bytes(DBColumn::BeaconBlob.into(), block_root.as_bytes())? {
|
||||
Some(ref blobs_bytes) => {
|
||||
let blobs = BlobSidecarList::from_ssz_bytes(blobs_bytes)?;
|
||||
self.block_cache
|
||||
.lock()
|
||||
.put_blobs(*block_root, blobs.clone());
|
||||
Ok(Some(blobs))
|
||||
}
|
||||
None => Ok(None),
|
||||
}
|
||||
}
|
||||
|
||||
/// Get a reference to the `ChainSpec` used by the database.
|
||||
pub fn get_chain_spec(&self) -> &ChainSpec {
|
||||
&self.spec
|
||||
@@ -1925,6 +2172,70 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
||||
.map(|a| a.anchor_slot)
|
||||
}
|
||||
|
||||
/// Initialize the `BlobInfo` when starting from genesis or a checkpoint.
|
||||
pub fn init_blob_info(&self, anchor_slot: Slot) -> Result<KeyValueStoreOp, Error> {
|
||||
let oldest_blob_slot = self.spec.deneb_fork_epoch.map(|fork_epoch| {
|
||||
std::cmp::max(anchor_slot, fork_epoch.start_slot(E::slots_per_epoch()))
|
||||
});
|
||||
let blob_info = BlobInfo {
|
||||
oldest_blob_slot,
|
||||
blobs_db: self.blobs_db.is_some(),
|
||||
};
|
||||
self.compare_and_set_blob_info(self.get_blob_info(), blob_info)
|
||||
}
|
||||
|
||||
/// Get a clone of the store's blob info.
|
||||
///
|
||||
/// To do mutations, use `compare_and_set_blob_info`.
|
||||
pub fn get_blob_info(&self) -> BlobInfo {
|
||||
self.blob_info.read_recursive().clone()
|
||||
}
|
||||
|
||||
/// Atomically update the blob info from `prev_value` to `new_value`.
|
||||
///
|
||||
/// Return a `KeyValueStoreOp` which should be written to disk, possibly atomically with other
|
||||
/// values.
|
||||
///
|
||||
/// Return an `BlobInfoConcurrentMutation` error if the `prev_value` provided
|
||||
/// is not correct.
|
||||
pub fn compare_and_set_blob_info(
|
||||
&self,
|
||||
prev_value: BlobInfo,
|
||||
new_value: BlobInfo,
|
||||
) -> Result<KeyValueStoreOp, Error> {
|
||||
let mut blob_info = self.blob_info.write();
|
||||
if *blob_info == prev_value {
|
||||
let kv_op = self.store_blob_info_in_batch(&new_value);
|
||||
*blob_info = new_value;
|
||||
Ok(kv_op)
|
||||
} else {
|
||||
Err(Error::BlobInfoConcurrentMutation)
|
||||
}
|
||||
}
|
||||
|
||||
/// As for `compare_and_set_blob_info`, but also writes the blob info to disk immediately.
|
||||
pub fn compare_and_set_blob_info_with_write(
|
||||
&self,
|
||||
prev_value: BlobInfo,
|
||||
new_value: BlobInfo,
|
||||
) -> Result<(), Error> {
|
||||
let kv_store_op = self.compare_and_set_blob_info(prev_value, new_value)?;
|
||||
self.hot_db.do_atomically(vec![kv_store_op])
|
||||
}
|
||||
|
||||
/// Load the blob info from disk, but do not set `self.blob_info`.
|
||||
fn load_blob_info(&self) -> Result<Option<BlobInfo>, Error> {
|
||||
self.hot_db.get(&BLOB_INFO_KEY)
|
||||
}
|
||||
|
||||
/// Store the given `blob_info` to disk.
|
||||
///
|
||||
/// The argument is intended to be `self.blob_info`, but is passed manually to avoid issues
|
||||
/// with recursive locking.
|
||||
fn store_blob_info_in_batch(&self, blob_info: &BlobInfo) -> KeyValueStoreOp {
|
||||
blob_info.as_kv_store_op(BLOB_INFO_KEY)
|
||||
}
|
||||
|
||||
/// Return the slot-window describing the available historic states.
|
||||
///
|
||||
/// Returns `(lower_limit, upper_limit)`.
|
||||
@@ -2242,7 +2553,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
||||
}
|
||||
}
|
||||
let payloads_pruned = ops.len();
|
||||
self.do_atomically(ops)?;
|
||||
self.do_atomically_with_block_and_blobs_cache(ops)?;
|
||||
info!(
|
||||
self.log,
|
||||
"Execution payload pruning complete";
|
||||
@@ -2250,6 +2561,181 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Try to prune blobs, approximating the current epoch from the split slot.
|
||||
pub fn try_prune_most_blobs(&self, force: bool) -> Result<(), Error> {
|
||||
let deneb_fork_epoch = match self.spec.deneb_fork_epoch {
|
||||
Some(epoch) => epoch,
|
||||
None => {
|
||||
debug!(self.log, "Deneb fork is disabled");
|
||||
return Ok(());
|
||||
}
|
||||
};
|
||||
// The current epoch is >= split_epoch + 2. It could be greater if the database is
|
||||
// configured to delay updating the split or finalization has ceased. In this instance we
|
||||
// choose to also delay the pruning of blobs (we never prune without finalization anyway).
|
||||
let min_current_epoch = self.get_split_slot().epoch(E::slots_per_epoch()) + 2;
|
||||
let min_data_availability_boundary = std::cmp::max(
|
||||
deneb_fork_epoch,
|
||||
min_current_epoch.saturating_sub(MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS),
|
||||
);
|
||||
|
||||
self.try_prune_blobs(force, min_data_availability_boundary)
|
||||
}
|
||||
|
||||
/// Try to prune blobs older than the data availability boundary.
|
||||
///
|
||||
/// Blobs from the epoch `data_availability_boundary - blob_prune_margin_epochs` are retained.
|
||||
/// This epoch is an _exclusive_ endpoint for the pruning process.
|
||||
///
|
||||
/// This function only supports pruning blobs older than the split point, which is older than
|
||||
/// (or equal to) finalization. Pruning blobs newer than finalization is not supported.
|
||||
///
|
||||
/// This function also assumes that the split is stationary while it runs. It should only be
|
||||
/// run from the migrator thread (where `migrate_database` runs) or the database manager.
|
||||
pub fn try_prune_blobs(
|
||||
&self,
|
||||
force: bool,
|
||||
data_availability_boundary: Epoch,
|
||||
) -> Result<(), Error> {
|
||||
if self.spec.deneb_fork_epoch.is_none() {
|
||||
debug!(self.log, "Deneb fork is disabled");
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let pruning_enabled = self.get_config().prune_blobs;
|
||||
let margin_epochs = self.get_config().blob_prune_margin_epochs;
|
||||
let epochs_per_blob_prune = self.get_config().epochs_per_blob_prune;
|
||||
|
||||
if !force && !pruning_enabled {
|
||||
debug!(
|
||||
self.log,
|
||||
"Blob pruning is disabled";
|
||||
"prune_blobs" => pruning_enabled
|
||||
);
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let blob_info = self.get_blob_info();
|
||||
let Some(oldest_blob_slot) = blob_info.oldest_blob_slot else {
|
||||
error!(self.log, "Slot of oldest blob is not known");
|
||||
return Err(HotColdDBError::BlobPruneLogicError.into());
|
||||
};
|
||||
|
||||
// Start pruning from the epoch of the oldest blob stored.
|
||||
// The start epoch is inclusive (blobs in this epoch will be pruned).
|
||||
let start_epoch = oldest_blob_slot.epoch(E::slots_per_epoch());
|
||||
|
||||
// Prune blobs up until the `data_availability_boundary - margin` or the split
|
||||
// slot's epoch, whichever is older. We can't prune blobs newer than the split.
|
||||
// The end epoch is also inclusive (blobs in this epoch will be pruned).
|
||||
let split = self.get_split_info();
|
||||
let end_epoch = std::cmp::min(
|
||||
data_availability_boundary - margin_epochs - 1,
|
||||
split.slot.epoch(E::slots_per_epoch()) - 1,
|
||||
);
|
||||
let end_slot = end_epoch.end_slot(E::slots_per_epoch());
|
||||
|
||||
let can_prune = end_epoch != 0 && start_epoch <= end_epoch;
|
||||
let should_prune = start_epoch + epochs_per_blob_prune <= end_epoch + 1;
|
||||
|
||||
if !force && !should_prune || !can_prune {
|
||||
debug!(
|
||||
self.log,
|
||||
"Blobs are pruned";
|
||||
"oldest_blob_slot" => oldest_blob_slot,
|
||||
"data_availability_boundary" => data_availability_boundary,
|
||||
"split_slot" => split.slot,
|
||||
"end_epoch" => end_epoch,
|
||||
"start_epoch" => start_epoch,
|
||||
);
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// Sanity checks.
|
||||
if let Some(anchor) = self.get_anchor_info() {
|
||||
if oldest_blob_slot < anchor.oldest_block_slot {
|
||||
error!(
|
||||
self.log,
|
||||
"Oldest blob is older than oldest block";
|
||||
"oldest_blob_slot" => oldest_blob_slot,
|
||||
"oldest_block_slot" => anchor.oldest_block_slot
|
||||
);
|
||||
return Err(HotColdDBError::BlobPruneLogicError.into());
|
||||
}
|
||||
}
|
||||
|
||||
// Iterate block roots forwards from the oldest blob slot.
|
||||
debug!(
|
||||
self.log,
|
||||
"Pruning blobs";
|
||||
"start_epoch" => start_epoch,
|
||||
"end_epoch" => end_epoch,
|
||||
"data_availability_boundary" => data_availability_boundary,
|
||||
);
|
||||
|
||||
let mut ops = vec![];
|
||||
let mut last_pruned_block_root = None;
|
||||
|
||||
for res in self.forwards_block_roots_iterator_until(
|
||||
oldest_blob_slot,
|
||||
end_slot,
|
||||
|| {
|
||||
let (_, split_state) = self
|
||||
.get_advanced_hot_state(split.block_root, split.slot, split.state_root)?
|
||||
.ok_or(HotColdDBError::MissingSplitState(
|
||||
split.state_root,
|
||||
split.slot,
|
||||
))?;
|
||||
|
||||
Ok((split_state, split.block_root))
|
||||
},
|
||||
&self.spec,
|
||||
)? {
|
||||
let (block_root, slot) = match res {
|
||||
Ok(tuple) => tuple,
|
||||
Err(e) => {
|
||||
warn!(
|
||||
self.log,
|
||||
"Stopping blob pruning early";
|
||||
"error" => ?e,
|
||||
);
|
||||
break;
|
||||
}
|
||||
};
|
||||
|
||||
if Some(block_root) != last_pruned_block_root && self.blobs_exist(&block_root)? {
|
||||
trace!(
|
||||
self.log,
|
||||
"Pruning blobs of block";
|
||||
"slot" => slot,
|
||||
"block_root" => ?block_root,
|
||||
);
|
||||
last_pruned_block_root = Some(block_root);
|
||||
ops.push(StoreOp::DeleteBlobs(block_root));
|
||||
}
|
||||
|
||||
if slot >= end_slot {
|
||||
break;
|
||||
}
|
||||
}
|
||||
let blob_lists_pruned = ops.len();
|
||||
let new_blob_info = BlobInfo {
|
||||
oldest_blob_slot: Some(end_slot + 1),
|
||||
blobs_db: blob_info.blobs_db,
|
||||
};
|
||||
let update_blob_info = self.compare_and_set_blob_info(blob_info, new_blob_info)?;
|
||||
ops.push(StoreOp::KeyValueOp(update_blob_info));
|
||||
|
||||
self.do_atomically_with_block_and_blobs_cache(ops)?;
|
||||
debug!(
|
||||
self.log,
|
||||
"Blob pruning complete";
|
||||
"blob_lists_pruned" => blob_lists_pruned,
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Advance the split point of the store, moving new finalized states to the freezer.
|
||||
@@ -2374,7 +2860,7 @@ pub fn migrate_database<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>(
|
||||
}
|
||||
|
||||
// Warning: Critical section. We have to take care not to put any of the two databases in an
|
||||
// inconsistent state if the OS process dies at any point during the freezeing
|
||||
// inconsistent state if the OS process dies at any point during the freezing
|
||||
// procedure.
|
||||
//
|
||||
// Since it is pretty much impossible to be atomic across more than one database, we trade
|
||||
@@ -2391,7 +2877,7 @@ pub fn migrate_database<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>(
|
||||
let mut split_guard = store.split.write();
|
||||
let latest_split_slot = split_guard.slot;
|
||||
|
||||
// Detect a sitation where the split point is (erroneously) changed from more than one
|
||||
// Detect a situation where the split point is (erroneously) changed from more than one
|
||||
// place in code.
|
||||
if latest_split_slot != current_split_slot {
|
||||
error!(
|
||||
@@ -2424,7 +2910,7 @@ pub fn migrate_database<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>(
|
||||
}
|
||||
|
||||
// Delete the states from the hot database if we got this far.
|
||||
store.do_atomically(hot_db_ops)?;
|
||||
store.do_atomically_with_block_and_blobs_cache(hot_db_ops)?;
|
||||
|
||||
// Update the cache's view of the finalized state.
|
||||
store.update_finalized_state(
|
||||
@@ -2443,7 +2929,7 @@ pub fn migrate_database<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>(
|
||||
}
|
||||
|
||||
/// Struct for storing the split slot and state root in the database.
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Default, Encode, Decode, Deserialize, Serialize)]
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Encode, Decode, Deserialize, Serialize)]
|
||||
pub struct Split {
|
||||
pub slot: Slot,
|
||||
pub state_root: Hash256,
|
||||
|
||||
@@ -1,6 +1,9 @@
|
||||
use crate::{DBColumn, Error, StoreItem};
|
||||
use ssz::{Decode, Encode};
|
||||
use types::{EthSpec, ExecutionPayload, ExecutionPayloadCapella, ExecutionPayloadMerge};
|
||||
use types::{
|
||||
BlobSidecarList, EthSpec, ExecutionPayload, ExecutionPayloadCapella, ExecutionPayloadDeneb,
|
||||
ExecutionPayloadMerge,
|
||||
};
|
||||
|
||||
macro_rules! impl_store_item {
|
||||
($ty_name:ident) => {
|
||||
@@ -21,6 +24,8 @@ macro_rules! impl_store_item {
|
||||
}
|
||||
impl_store_item!(ExecutionPayloadMerge);
|
||||
impl_store_item!(ExecutionPayloadCapella);
|
||||
impl_store_item!(ExecutionPayloadDeneb);
|
||||
impl_store_item!(BlobSidecarList);
|
||||
|
||||
/// This fork-agnostic implementation should be only used for writing.
|
||||
///
|
||||
@@ -36,9 +41,13 @@ impl<E: EthSpec> StoreItem for ExecutionPayload<E> {
|
||||
}
|
||||
|
||||
fn from_store_bytes(bytes: &[u8]) -> Result<Self, Error> {
|
||||
ExecutionPayloadCapella::from_ssz_bytes(bytes)
|
||||
.map(Self::Capella)
|
||||
.or_else(|_| ExecutionPayloadMerge::from_ssz_bytes(bytes).map(Self::Merge))
|
||||
ExecutionPayloadDeneb::from_ssz_bytes(bytes)
|
||||
.map(Self::Deneb)
|
||||
.or_else(|_| {
|
||||
ExecutionPayloadCapella::from_ssz_bytes(bytes)
|
||||
.map(Self::Capella)
|
||||
.or_else(|_| ExecutionPayloadMerge::from_ssz_bytes(bytes).map(Self::Merge))
|
||||
})
|
||||
.map_err(Into::into)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -196,6 +196,36 @@ impl<E: EthSpec> KeyValueStore<E> for LevelDB<E> {
|
||||
)
|
||||
}
|
||||
|
||||
fn iter_raw_entries(&self, column: DBColumn, prefix: &[u8]) -> RawEntryIter {
|
||||
let start_key = BytesKey::from_vec(get_key_for_col(column.into(), prefix));
|
||||
|
||||
let iter = self.db.iter(self.read_options());
|
||||
iter.seek(&start_key);
|
||||
|
||||
Box::new(
|
||||
iter.take_while(move |(key, _)| key.key.starts_with(start_key.key.as_slice()))
|
||||
.map(move |(bytes_key, value)| {
|
||||
let subkey = &bytes_key.key[column.as_bytes().len()..];
|
||||
Ok((Vec::from(subkey), value))
|
||||
}),
|
||||
)
|
||||
}
|
||||
|
||||
fn iter_raw_keys(&self, column: DBColumn, prefix: &[u8]) -> RawKeyIter {
|
||||
let start_key = BytesKey::from_vec(get_key_for_col(column.into(), prefix));
|
||||
|
||||
let iter = self.db.keys_iter(self.read_options());
|
||||
iter.seek(&start_key);
|
||||
|
||||
Box::new(
|
||||
iter.take_while(move |key| key.key.starts_with(start_key.key.as_slice()))
|
||||
.map(move |bytes_key| {
|
||||
let subkey = &bytes_key.key[column.as_bytes().len()..];
|
||||
Ok(Vec::from(subkey))
|
||||
}),
|
||||
)
|
||||
}
|
||||
|
||||
/// Iterate through all keys and values in a particular column.
|
||||
fn iter_column_keys(&self, column: DBColumn) -> ColumnKeyIter {
|
||||
let start_key =
|
||||
|
||||
@@ -32,6 +32,7 @@ pub use self::config::StoreConfig;
|
||||
pub use self::hot_cold_store::{HotColdDB, HotStateSummary, Split};
|
||||
pub use self::leveldb_store::LevelDB;
|
||||
pub use self::memory_store::MemoryStore;
|
||||
pub use crate::metadata::BlobInfo;
|
||||
pub use errors::Error;
|
||||
pub use impls::beacon_state::StorageContainer as BeaconStateStorageContainer;
|
||||
pub use metadata::AnchorInfo;
|
||||
@@ -45,6 +46,9 @@ pub use validator_pubkey_cache::ValidatorPubkeyCache;
|
||||
pub type ColumnIter<'a, K> = Box<dyn Iterator<Item = Result<(K, Vec<u8>), Error>> + 'a>;
|
||||
pub type ColumnKeyIter<'a> = Box<dyn Iterator<Item = Result<Hash256, Error>> + 'a>;
|
||||
|
||||
pub type RawEntryIter<'a> = Box<dyn Iterator<Item = Result<(Vec<u8>, Vec<u8>), Error>> + 'a>;
|
||||
pub type RawKeyIter<'a> = Box<dyn Iterator<Item = Result<Vec<u8>, Error>> + 'a>;
|
||||
|
||||
pub trait KeyValueStore<E: EthSpec>: Sync + Send + Sized + 'static {
|
||||
/// Retrieve some bytes in `column` with `key`.
|
||||
fn get_bytes(&self, column: &str, key: &[u8]) -> Result<Option<Vec<u8>>, Error>;
|
||||
@@ -85,6 +89,14 @@ pub trait KeyValueStore<E: EthSpec>: Sync + Send + Sized + 'static {
|
||||
|
||||
fn iter_column_from<K: Key>(&self, column: DBColumn, from: &[u8]) -> ColumnIter<K>;
|
||||
|
||||
fn iter_raw_entries(&self, _column: DBColumn, _prefix: &[u8]) -> RawEntryIter {
|
||||
Box::new(std::iter::empty())
|
||||
}
|
||||
|
||||
fn iter_raw_keys(&self, _column: DBColumn, _prefix: &[u8]) -> RawKeyIter {
|
||||
Box::new(std::iter::empty())
|
||||
}
|
||||
|
||||
/// Iterate through all keys in a particular column.
|
||||
fn iter_column_keys(&self, column: DBColumn) -> ColumnKeyIter;
|
||||
}
|
||||
@@ -116,6 +128,7 @@ pub fn get_key_for_col(column: &str, key: &[u8]) -> Vec<u8> {
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
#[derive(Clone)]
|
||||
pub enum KeyValueStoreOp {
|
||||
PutKeyValue(Vec<u8>, Vec<u8>),
|
||||
DeleteKey(Vec<u8>),
|
||||
@@ -169,12 +182,15 @@ pub trait ItemStore<E: EthSpec>: KeyValueStore<E> + Sync + Send + Sized + 'stati
|
||||
|
||||
/// Reified key-value storage operation. Helps in modifying the storage atomically.
|
||||
/// See also https://github.com/sigp/lighthouse/issues/692
|
||||
#[derive(Clone)]
|
||||
pub enum StoreOp<'a, E: EthSpec> {
|
||||
PutBlock(Hash256, Arc<SignedBeaconBlock<E>>),
|
||||
PutState(Hash256, &'a BeaconState<E>),
|
||||
PutBlobs(Hash256, BlobSidecarList<E>),
|
||||
PutStateTemporaryFlag(Hash256),
|
||||
DeleteStateTemporaryFlag(Hash256),
|
||||
DeleteBlock(Hash256),
|
||||
DeleteBlobs(Hash256),
|
||||
DeleteState(Hash256, Option<Slot>),
|
||||
DeleteExecutionPayload(Hash256),
|
||||
KeyValueOp(KeyValueStoreOp),
|
||||
@@ -199,6 +215,8 @@ pub enum DBColumn {
|
||||
/// - Value: ZSTD-compressed SSZ-encoded blinded block.
|
||||
#[strum(serialize = "bbf")]
|
||||
BeaconBlockFrozen,
|
||||
#[strum(serialize = "blb")]
|
||||
BeaconBlob,
|
||||
/// For full `BeaconState`s in the hot database (finalized or fork-boundary states).
|
||||
#[strum(serialize = "ste")]
|
||||
BeaconState,
|
||||
@@ -247,6 +265,8 @@ pub enum DBColumn {
|
||||
OptimisticTransitionBlock,
|
||||
#[strum(serialize = "bhs")]
|
||||
BeaconHistoricalSummaries,
|
||||
#[strum(serialize = "olc")]
|
||||
OverflowLRUCache,
|
||||
}
|
||||
|
||||
/// A block from the database, which might have an execution payload or not.
|
||||
@@ -272,6 +292,7 @@ impl DBColumn {
|
||||
Self::BeaconMeta
|
||||
| Self::BeaconBlock
|
||||
| Self::BeaconState
|
||||
| Self::BeaconBlob
|
||||
| Self::BeaconStateSummary
|
||||
| Self::BeaconStateTemporary
|
||||
| Self::ExecPayload
|
||||
|
||||
@@ -4,7 +4,7 @@ use ssz::{Decode, Encode};
|
||||
use ssz_derive::{Decode, Encode};
|
||||
use types::{Checkpoint, Hash256, Slot};
|
||||
|
||||
pub const CURRENT_SCHEMA_VERSION: SchemaVersion = SchemaVersion(24);
|
||||
pub const CURRENT_SCHEMA_VERSION: SchemaVersion = SchemaVersion(25);
|
||||
|
||||
// All the keys that get stored under the `BeaconMeta` column.
|
||||
//
|
||||
@@ -15,6 +15,7 @@ pub const SPLIT_KEY: Hash256 = Hash256::repeat_byte(2);
|
||||
pub const PRUNING_CHECKPOINT_KEY: Hash256 = Hash256::repeat_byte(3);
|
||||
pub const COMPACTION_TIMESTAMP_KEY: Hash256 = Hash256::repeat_byte(4);
|
||||
pub const ANCHOR_INFO_KEY: Hash256 = Hash256::repeat_byte(5);
|
||||
pub const BLOB_INFO_KEY: Hash256 = Hash256::repeat_byte(6);
|
||||
|
||||
/// State upper limit value used to indicate that a node is not storing historic states.
|
||||
pub const STATE_UPPER_LIMIT_NO_RETAIN: Slot = Slot::new(u64::MAX);
|
||||
@@ -122,3 +123,32 @@ impl StoreItem for AnchorInfo {
|
||||
Ok(Self::from_ssz_bytes(bytes)?)
|
||||
}
|
||||
}
|
||||
|
||||
/// Database parameters relevant to blob sync.
|
||||
#[derive(Debug, PartialEq, Eq, Clone, Encode, Decode, Serialize, Deserialize, Default)]
|
||||
pub struct BlobInfo {
|
||||
/// The slot after which blobs are or *will be* available (>=).
|
||||
///
|
||||
/// If this slot is in the future, then it is the first slot of the Deneb fork, from which blobs
|
||||
/// will be available.
|
||||
///
|
||||
/// If the `oldest_blob_slot` is `None` then this means that the Deneb fork epoch is not yet
|
||||
/// known.
|
||||
pub oldest_blob_slot: Option<Slot>,
|
||||
/// A separate blobs database is in use.
|
||||
pub blobs_db: bool,
|
||||
}
|
||||
|
||||
impl StoreItem for BlobInfo {
|
||||
fn db_column() -> DBColumn {
|
||||
DBColumn::BeaconMeta
|
||||
}
|
||||
|
||||
fn as_store_bytes(&self) -> Vec<u8> {
|
||||
self.as_ssz_bytes()
|
||||
}
|
||||
|
||||
fn from_store_bytes(bytes: &[u8]) -> Result<Self, Error> {
|
||||
Ok(Self::from_ssz_bytes(bytes)?)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -124,6 +124,10 @@ lazy_static! {
|
||||
"store_beacon_block_cache_hit_total",
|
||||
"Number of hits to the store's block cache"
|
||||
);
|
||||
pub static ref BEACON_BLOBS_CACHE_HIT_COUNT: Result<IntCounter> = try_create_int_counter(
|
||||
"store_beacon_blobs_cache_hit_total",
|
||||
"Number of hits to the store's blob cache"
|
||||
);
|
||||
pub static ref BEACON_BLOCK_READ_TIMES: Result<Histogram> = try_create_histogram(
|
||||
"store_beacon_block_read_overhead_seconds",
|
||||
"Overhead on reading a beacon block from the DB (e.g., decoding)"
|
||||
|
||||
@@ -54,7 +54,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> ValidatorPubkeyCache<E,
|
||||
};
|
||||
|
||||
let store_ops = cache.import_new_pubkeys(state)?;
|
||||
store.do_atomically(store_ops)?;
|
||||
store.do_atomically_with_block_and_blobs_cache(store_ops)?;
|
||||
|
||||
Ok(cache)
|
||||
}
|
||||
@@ -335,7 +335,7 @@ mod test {
|
||||
let ops = cache
|
||||
.import_new_pubkeys(&state)
|
||||
.expect("should import pubkeys");
|
||||
store.do_atomically(ops).unwrap();
|
||||
store.do_atomically_with_block_and_blobs_cache(ops).unwrap();
|
||||
check_cache_get(&cache, &keypairs[..]);
|
||||
drop(cache);
|
||||
|
||||
|
||||
Reference in New Issue
Block a user