Persist data columns to store (#6255)

* Persist data columns (from das PR #5196)
This commit is contained in:
Jimmy Chen
2024-08-14 14:36:24 +10:00
committed by GitHub
parent 3a996fbbee
commit 18df7010c3
5 changed files with 67 additions and 23 deletions

View File

@@ -9,6 +9,7 @@ use state_processing::{
use std::borrow::Cow;
use std::iter;
use std::time::Duration;
use store::metadata::DataColumnInfo;
use store::{chunked_vector::BlockRoots, AnchorInfo, BlobInfo, ChunkWriter, KeyValueStore};
use types::{Hash256, Slot};
@@ -66,6 +67,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.get_anchor_info()
.ok_or(HistoricalBlockError::NoAnchorInfo)?;
let blob_info = self.store.get_blob_info();
let data_column_info = self.store.get_data_column_info();
// Take all blocks with slots less than the oldest block slot.
let num_relevant = blocks.partition_point(|available_block| {
@@ -90,18 +92,27 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
return Ok(0);
}
let n_blobs_lists_to_import = blocks_to_import
// Blobs are stored per block, and data columns are each stored individually
let n_blob_ops_per_block = if self.spec.is_peer_das_scheduled() {
self.data_availability_checker.get_custody_columns_count()
} else {
1
};
let blob_batch_size = blocks_to_import
.iter()
.filter(|available_block| available_block.blobs().is_some())
.count();
.count()
.saturating_mul(n_blob_ops_per_block);
let mut expected_block_root = anchor_info.oldest_block_parent;
let mut prev_block_slot = anchor_info.oldest_block_slot;
let mut chunk_writer =
ChunkWriter::<BlockRoots, _, _>::new(&self.store.cold_db, prev_block_slot.as_usize())?;
let mut new_oldest_blob_slot = blob_info.oldest_blob_slot;
let mut new_oldest_data_column_slot = data_column_info.oldest_data_column_slot;
let mut blob_batch = Vec::with_capacity(n_blobs_lists_to_import);
let mut blob_batch = Vec::with_capacity(blob_batch_size);
let mut cold_batch = Vec::with_capacity(blocks_to_import.len());
let mut hot_batch = Vec::with_capacity(blocks_to_import.len());
let mut signed_blocks = Vec::with_capacity(blocks_to_import.len());
@@ -129,11 +140,10 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.blobs_as_kv_store_ops(&block_root, blobs, &mut blob_batch);
}
// Store the data columns too
if let Some(_data_columns) = maybe_data_columns {
// TODO(das): depends on https://github.com/sigp/lighthouse/pull/6073
// new_oldest_data_column_slot = Some(block.slot());
// self.store
// .data_columns_as_kv_store_ops(&block_root, data_columns, &mut blob_batch);
if let Some(data_columns) = maybe_data_columns {
new_oldest_data_column_slot = Some(block.slot());
self.store
.data_columns_as_kv_store_ops(&block_root, data_columns, &mut blob_batch);
}
// Store block roots, including at all skip slots in the freezer DB.
@@ -212,7 +222,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
self.store.hot_db.do_atomically(hot_batch)?;
self.store.cold_db.do_atomically(cold_batch)?;
let mut anchor_and_blob_batch = Vec::with_capacity(2);
let mut anchor_and_blob_batch = Vec::with_capacity(3);
// Update the blob info.
if new_oldest_blob_slot != blob_info.oldest_blob_slot {
@@ -228,6 +238,19 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}
}
// Update the data column info.
if new_oldest_data_column_slot != data_column_info.oldest_data_column_slot {
if let Some(oldest_data_column_slot) = new_oldest_data_column_slot {
let new_data_column_info = DataColumnInfo {
oldest_data_column_slot: Some(oldest_data_column_slot),
};
anchor_and_blob_batch.push(
self.store
.compare_and_set_data_column_info(data_column_info, new_data_column_info)?,
);
}
}
// Update the anchor.
let new_anchor = AnchorInfo {
oldest_block_slot: prev_block_slot,