diff --git a/beacon_node/store/src/errors.rs b/beacon_node/store/src/errors.rs index 30ee66074f..bbcc043e74 100644 --- a/beacon_node/store/src/errors.rs +++ b/beacon_node/store/src/errors.rs @@ -25,6 +25,8 @@ pub enum Error { SchemaMigrationError(String), /// The store's `anchor_info` was mutated concurrently, the latest modification wasn't applied. AnchorInfoConcurrentMutation, + /// The store's `blob_info` was mutated concurrently, the latest modification wasn't applied. + BlobInfoConcurrentMutation, /// The block or state is unavailable due to weak subjectivity sync. HistoryUnavailable, /// State reconstruction cannot commence because not all historic blocks are known. diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index e8c782b8c5..1b579be5e9 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -12,9 +12,9 @@ use crate::leveldb_store::BytesKey; use crate::leveldb_store::LevelDB; use crate::memory_store::MemoryStore; use crate::metadata::{ - AnchorInfo, CompactionTimestamp, PruningCheckpoint, SchemaVersion, ANCHOR_INFO_KEY, - COMPACTION_TIMESTAMP_KEY, CONFIG_KEY, CURRENT_SCHEMA_VERSION, PRUNING_CHECKPOINT_KEY, - SCHEMA_VERSION_KEY, SPLIT_KEY, + AnchorInfo, BlobInfo, CompactionTimestamp, PruningCheckpoint, SchemaVersion, ANCHOR_INFO_KEY, + BLOB_INFO_KEY, COMPACTION_TIMESTAMP_KEY, CONFIG_KEY, CURRENT_SCHEMA_VERSION, + PRUNING_CHECKPOINT_KEY, SCHEMA_VERSION_KEY, SPLIT_KEY, }; use crate::metrics; use crate::{ @@ -53,6 +53,8 @@ pub struct HotColdDB, Cold: ItemStore> { pub(crate) split: RwLock, /// The starting slots for the range of blocks & states stored in the database. anchor_info: RwLock>, + /// The starting slots for the range of blobs stored in the database. + blob_info: RwLock>, pub(crate) config: StoreConfig, /// Cold database containing compact historical data. pub cold_db: Cold, @@ -1293,6 +1295,65 @@ impl, Cold: ItemStore> HotColdDB .map(|a| a.anchor_slot) } + /// Get a clone of the store's blob info. + /// + /// To do mutations, use `compare_and_set_blob_info`. + pub fn get_blob_info(&self) -> Option { + self.blob_info.read_recursive().clone() + } + + /// Atomically update the blob info from `prev_value` to `new_value`. + /// + /// Return a `KeyValueStoreOp` which should be written to disk, possibly atomically with other + /// values. + /// + /// Return an `BlobInfoConcurrentMutation` error if the `prev_value` provided + /// is not correct. + pub fn compare_and_set_blob_info( + &self, + prev_value: Option, + new_value: Option, + ) -> Result { + let mut blob_info = self.blob_info.write(); + if *blob_info == prev_value { + let kv_op = self.store_blob_info_in_batch(&new_value); + *blob_info = new_value; + Ok(kv_op) + } else { + Err(Error::AnchorInfoConcurrentMutation) + } + } + + /// As for `compare_and_set_blob_info`, but also writes the blob info to disk immediately. + pub fn compare_and_set_blob_info_with_write( + &self, + prev_value: Option, + new_value: Option, + ) -> Result<(), Error> { + let kv_store_op = self.compare_and_set_blob_info(prev_value, new_value)?; + self.hot_db.do_atomically(vec![kv_store_op]) + } + + /// Load the blob info from disk, but do not set `self.blob_info`. + fn load_blob_info(&self) -> Result, Error> { + self.hot_db.get(&BLOB_INFO_KEY) + } + + /// Store the given `blob_info` to disk. + /// + /// The argument is intended to be `self.blob_info`, but is passed manually to avoid issues + /// with recursive locking. + fn store_blob_info_in_batch(&self, blob_info: &Option) -> KeyValueStoreOp { + if let Some(ref blob_info) = blob_info { + blob_info.as_kv_store_op(BLOB_INFO_KEY) + } else { + KeyValueStoreOp::DeleteKey(get_key_for_col( + DBColumn::BeaconMeta.into(), + BLOB_INFO_KEY.as_bytes(), + )) + } + } + /// Return the slot-window describing the available historic states. /// /// Returns `(lower_limit, upper_limit)`. diff --git a/beacon_node/store/src/metadata.rs b/beacon_node/store/src/metadata.rs index 5cb3f12200..a19f8d91d1 100644 --- a/beacon_node/store/src/metadata.rs +++ b/beacon_node/store/src/metadata.rs @@ -15,6 +15,7 @@ pub const SPLIT_KEY: Hash256 = Hash256::repeat_byte(2); pub const PRUNING_CHECKPOINT_KEY: Hash256 = Hash256::repeat_byte(3); pub const COMPACTION_TIMESTAMP_KEY: Hash256 = Hash256::repeat_byte(4); pub const ANCHOR_INFO_KEY: Hash256 = Hash256::repeat_byte(5); +pub const BLOB_INFO_KEY: Hash256 = Hash256::repeat_byte(6); #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] pub struct SchemaVersion(pub u64); @@ -117,3 +118,28 @@ impl StoreItem for AnchorInfo { Ok(Self::from_ssz_bytes(bytes)?) } } + +/// Database parameters relevant to blob sync. +#[derive(Debug, PartialEq, Eq, Clone, Encode, Decode, Serialize, Deserialize)] +pub struct BlobInfo { + /// The block root of the next blob that needs to be added to fill in the history. + pub oldest_blob_parent: Hash256, + /// The slot before which blobs are available. + pub oldest_blob_slot: Slot, + /// The slot from which blobs are available. + pub latest_blob_slot: Slot, +} + +impl StoreItem for AnchorInfo { + fn db_column() -> DBColumn { + DBColumn::BeaconMeta + } + + fn as_store_bytes(&self) -> Vec { + self.as_ssz_bytes() + } + + fn from_store_bytes(bytes: &[u8]) -> Result { + Ok(Self::from_ssz_bytes(bytes)?) + } +}