Data column custody info (#7648)

#7647


  Introduces a new record in the blobs db `DataColumnCustodyInfo`

When `DataColumnCustodyInfo` exists in the db this indicates that a recent cgc change has occurred and/or that a custody backfill sync is currently in progress (custody backfill will be added as a separate PR). When a cgc change has occurred `earliest_available_slot` will be equal to the slot at which the cgc change occured. During custody backfill sync`earliest_available_slot` should be updated incrementally as it progresses.

~~Note that if `advertise_false_custody_group_count` is enabled we do not add a `DataColumnCustodyInfo` record in the db as that would affect the status v2 response.~~
(See comment https://github.com/sigp/lighthouse/pull/7648#discussion_r2212403389)

~~If `DataColumnCustodyInfo` doesn't exist in the db this indicates that we have fulfilled our custody requirements up to the DA window.~~
(It now always exist, and the slot will be set to `None` once backfill is complete)

StatusV2 now uses `DataColumnCustodyInfo` to calculate the `earliest_available_slot` if a `DataColumnCustodyInfo` record exists in the db, if it's `None`, then we return the `oldest_block_slot`.
This commit is contained in:
Eitan Seri-Levi
2025-07-22 15:30:30 +02:00
committed by GitHub
parent b48879a566
commit db8b6be9df
11 changed files with 185 additions and 13 deletions

View File

@@ -6807,6 +6807,15 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.map(|duration| (next_digest_epoch, duration))
}
/// Update data column custody info with the slot at which cgc was changed.
pub fn update_data_column_custody_info(&self, slot: Option<Slot>) {
self.store
.put_data_column_custody_info(slot)
.unwrap_or_else(
|e| tracing::error!(error = ?e, "Failed to update data column custody info"),
);
}
/// This method serves to get a sense of the current chain health. It is used in block proposal
/// to determine whether we should outsource payload production duties.
///

View File

@@ -3,6 +3,7 @@ mod migration_schema_v23;
mod migration_schema_v24;
mod migration_schema_v25;
mod migration_schema_v26;
mod migration_schema_v27;
use crate::beacon_chain::BeaconChainTypes;
use std::sync::Arc;
@@ -67,6 +68,17 @@ pub fn migrate_schema<T: BeaconChainTypes>(
let ops = migration_schema_v26::downgrade_from_v26::<T>(db.clone())?;
db.store_schema_version_atomically(to, ops)
}
(SchemaVersion(26), SchemaVersion(27)) => {
// This migration updates the blobs db. The schema version
// is bumped inside upgrade_to_v27.
migration_schema_v27::upgrade_to_v27::<T>(db.clone())
}
(SchemaVersion(27), SchemaVersion(26)) => {
// Downgrading is essentially a no-op and is only possible
// if peer das isn't scheduled.
migration_schema_v27::downgrade_from_v27::<T>(db.clone())?;
db.store_schema_version_atomically(to, vec![])
}
// Anything else is an error.
(_, _) => Err(HotColdDBError::UnsupportedSchemaVersion {
target_version: to,

View File

@@ -0,0 +1,26 @@
use crate::BeaconChainTypes;
use std::sync::Arc;
use store::{metadata::SchemaVersion, Error, HotColdDB};
/// Add `DataColumnCustodyInfo` entry to v27.
pub fn upgrade_to_v27<T: BeaconChainTypes>(
db: Arc<HotColdDB<T::EthSpec, T::HotStore, T::ColdStore>>,
) -> Result<(), Error> {
if db.spec.is_peer_das_scheduled() {
db.put_data_column_custody_info(None)?;
db.store_schema_version_atomically(SchemaVersion(27), vec![])?;
}
Ok(())
}
pub fn downgrade_from_v27<T: BeaconChainTypes>(
db: Arc<HotColdDB<T::EthSpec, T::HotStore, T::ColdStore>>,
) -> Result<(), Error> {
if db.spec.is_peer_das_scheduled() {
return Err(Error::MigrationError(
"Cannot downgrade from v27 if peerDAS is scheduled".to_string(),
));
}
Ok(())
}

View File

@@ -217,6 +217,7 @@ impl CustodyContext {
new_custody_group_count: updated_cgc,
sampling_count: self
.num_of_custody_groups_to_sample(Some(effective_epoch), spec),
effective_epoch,
});
}
}
@@ -287,6 +288,7 @@ impl CustodyContext {
pub struct CustodyCountChanged {
pub new_custody_group_count: u64,
pub sampling_count: u64,
pub effective_epoch: Epoch,
}
/// The custody information that gets persisted across runs.

View File

@@ -9,12 +9,14 @@ use operation_pool::PersistedOperationPool;
use ssz::Encode;
use std::sync::{Arc, LazyLock};
use store::{
database::interface::BeaconNodeBackend, hot_cold_store::Split, metadata::DataColumnInfo,
database::interface::BeaconNodeBackend,
hot_cold_store::Split,
metadata::{DataColumnCustodyInfo, DataColumnInfo},
DBColumn, HotColdDB, StoreConfig, StoreItem,
};
use strum::IntoEnumIterator;
use tempfile::{tempdir, TempDir};
use types::{ChainSpec, Hash256, Keypair, MainnetEthSpec};
use types::{ChainSpec, Hash256, Keypair, MainnetEthSpec, Slot};
type E = MainnetEthSpec;
type Store<E> = Arc<HotColdDB<E, BeaconNodeBackend<E>, BeaconNodeBackend<E>>>;
@@ -84,11 +86,13 @@ async fn schema_stability() {
chain.persist_op_pool().unwrap();
chain.persist_custody_context().unwrap();
insert_data_column_custody_info(&store, &harness.spec);
check_db_columns();
check_metadata_sizes(&store);
check_op_pool(&store);
check_custody_context(&store, &harness.spec);
check_custody_info(&store, &harness.spec);
check_persisted_chain(&store);
// Not covered here:
@@ -100,13 +104,21 @@ async fn schema_stability() {
fn check_db_columns() {
let current_columns: Vec<&'static str> = DBColumn::iter().map(|c| c.as_str()).collect();
let expected_columns = vec![
"bma", "blk", "blb", "bdc", "ste", "hsd", "hsn", "bsn", "bsd", "bss", "bs3", "bcs", "bst",
"exp", "bch", "opo", "etc", "frk", "pkc", "brp", "bsx", "bsr", "bbx", "bbr", "bhr", "brm",
"dht", "cus", "otb", "bhs", "olc", "lcu", "scb", "scm", "dmy",
"bma", "blk", "blb", "bdc", "bdi", "ste", "hsd", "hsn", "bsn", "bsd", "bss", "bs3", "bcs",
"bst", "exp", "bch", "opo", "etc", "frk", "pkc", "brp", "bsx", "bsr", "bbx", "bbr", "bhr",
"brm", "dht", "cus", "otb", "bhs", "olc", "lcu", "scb", "scm", "dmy",
];
assert_eq!(expected_columns, current_columns);
}
fn insert_data_column_custody_info(store: &Store<E>, spec: &ChainSpec) {
if spec.is_peer_das_scheduled() {
store
.put_data_column_custody_info(Some(Slot::new(0)))
.unwrap();
}
}
/// Check the SSZ sizes of known on-disk metadata.
///
/// New types can be added here as the schema evolves.
@@ -122,6 +134,7 @@ fn check_metadata_sizes(store: &Store<E>) {
}
);
assert_eq!(DataColumnInfo::default().ssz_bytes_len(), 5);
assert_eq!(DataColumnCustodyInfo::default().ssz_bytes_len(), 5);
}
fn check_op_pool(store: &Store<E>) {
@@ -143,6 +156,15 @@ fn check_custody_context(store: &Store<E>, spec: &ChainSpec) {
}
}
fn check_custody_info(store: &Store<E>, spec: &ChainSpec) {
let data_column_custody_info = store.get_data_column_custody_info().unwrap();
if spec.is_peer_das_scheduled() {
assert_eq!(data_column_custody_info.unwrap().as_ssz_bytes().len(), 13);
} else {
assert!(data_column_custody_info.is_none());
}
}
fn check_persisted_chain(store: &Store<E>) {
let chain = store
.get_item::<PersistedBeaconChain>(&Hash256::ZERO)

View File

@@ -3157,7 +3157,11 @@ async fn schema_downgrade_to_min_version(
)
.await;
let min_version = SchemaVersion(22);
let min_version = if spec.is_fulu_scheduled() {
SchemaVersion(27)
} else {
SchemaVersion(22)
};
// Save the slot clock so that the new harness doesn't revert in time.
let slot_clock = harness.chain.slot_clock.clone();

View File

@@ -3760,7 +3760,6 @@ pub fn serve<T: BeaconChainTypes>(
.to_string(),
));
}
Ok(())
})
},
@@ -3845,6 +3844,12 @@ pub fn serve<T: BeaconChainTypes>(
current_slot,
&chain.spec,
) {
chain.update_data_column_custody_info(Some(
cgc_change
.effective_epoch
.start_slot(T::EthSpec::slots_per_epoch()),
));
network_tx.send(NetworkMessage::CustodyCountChanged {
new_custody_group_count: cgc_change.new_custody_group_count,
sampling_count: cgc_change.sampling_count,

View File

@@ -29,8 +29,22 @@ pub(crate) fn status_message<T: BeaconChainTypes>(beacon_chain: &BeaconChain<T>)
finalized_checkpoint.root = Hash256::zero();
}
let earliest_available_slot = beacon_chain.store.get_anchor_info().oldest_block_slot;
// NOTE: We are making an assumption that `get_data_column_custody_info` wont fail.
let earliest_available_data_column_slot = beacon_chain
.store
.get_data_column_custody_info()
.ok()
.flatten()
.and_then(|info| info.earliest_data_column_slot);
// If data_column_custody_info.earliest_data_column_slot is `None`,
// no recent cgc changes have occurred and no cgc backfill is in progress.
let earliest_available_slot =
if let Some(earliest_available_data_column_slot) = earliest_available_data_column_slot {
earliest_available_data_column_slot
} else {
beacon_chain.store.get_anchor_info().oldest_block_slot
};
StatusMessage::V2(StatusMessageV2 {
fork_digest,
finalized_root: finalized_checkpoint.root,

View File

@@ -6,10 +6,10 @@ use crate::historic_state_cache::HistoricStateCache;
use crate::iter::{BlockRootsIterator, ParentRootBlockIterator, RootsIterator};
use crate::memory_store::MemoryStore;
use crate::metadata::{
AnchorInfo, BlobInfo, CompactionTimestamp, DataColumnInfo, SchemaVersion, ANCHOR_INFO_KEY,
ANCHOR_UNINITIALIZED, BLOB_INFO_KEY, COMPACTION_TIMESTAMP_KEY, CONFIG_KEY,
CURRENT_SCHEMA_VERSION, DATA_COLUMN_INFO_KEY, SCHEMA_VERSION_KEY, SPLIT_KEY,
STATE_UPPER_LIMIT_NO_RETAIN,
AnchorInfo, BlobInfo, CompactionTimestamp, DataColumnCustodyInfo, DataColumnInfo,
SchemaVersion, ANCHOR_INFO_KEY, ANCHOR_UNINITIALIZED, BLOB_INFO_KEY, COMPACTION_TIMESTAMP_KEY,
CONFIG_KEY, CURRENT_SCHEMA_VERSION, DATA_COLUMN_CUSTODY_INFO_KEY, DATA_COLUMN_INFO_KEY,
SCHEMA_VERSION_KEY, SPLIT_KEY, STATE_UPPER_LIMIT_NO_RETAIN,
};
use crate::state_cache::{PutStateOutcome, StateCache};
use crate::{
@@ -91,6 +91,7 @@ struct BlockCache<E: EthSpec> {
block_cache: LruCache<Hash256, SignedBeaconBlock<E>>,
blob_cache: LruCache<Hash256, BlobSidecarList<E>>,
data_column_cache: LruCache<Hash256, HashMap<ColumnIndex, Arc<DataColumnSidecar<E>>>>,
data_column_custody_info_cache: Option<DataColumnCustodyInfo>,
}
impl<E: EthSpec> BlockCache<E> {
@@ -99,6 +100,7 @@ impl<E: EthSpec> BlockCache<E> {
block_cache: LruCache::new(size),
blob_cache: LruCache::new(size),
data_column_cache: LruCache::new(size),
data_column_custody_info_cache: None,
}
}
pub fn put_block(&mut self, block_root: Hash256, block: SignedBeaconBlock<E>) {
@@ -112,6 +114,12 @@ impl<E: EthSpec> BlockCache<E> {
.get_or_insert_mut(block_root, Default::default)
.insert(data_column.index, data_column);
}
pub fn put_data_column_custody_info(
&mut self,
data_column_custody_info: Option<DataColumnCustodyInfo>,
) {
self.data_column_custody_info_cache = data_column_custody_info;
}
pub fn get_block<'a>(&'a mut self, block_root: &Hash256) -> Option<&'a SignedBeaconBlock<E>> {
self.block_cache.get(block_root)
}
@@ -129,6 +137,9 @@ impl<E: EthSpec> BlockCache<E> {
.get(block_root)
.and_then(|map| map.get(column_index).cloned())
}
pub fn get_data_column_custody_info(&self) -> Option<DataColumnCustodyInfo> {
self.data_column_custody_info_cache.clone()
}
pub fn delete_block(&mut self, block_root: &Hash256) {
let _ = self.block_cache.pop(block_root);
}
@@ -922,6 +933,24 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
));
}
pub fn put_data_column_custody_info(
&self,
earliest_data_column_slot: Option<Slot>,
) -> Result<(), Error> {
let data_column_custody_info = DataColumnCustodyInfo {
earliest_data_column_slot,
};
self.blobs_db
.put(&DATA_COLUMN_CUSTODY_INFO_KEY, &data_column_custody_info)?;
self.block_cache
.lock()
.put_data_column_custody_info(Some(data_column_custody_info));
Ok(())
}
pub fn put_data_columns(
&self,
block_root: &Hash256,
@@ -2389,6 +2418,27 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
})
}
/// Fetch custody info from the cache.
/// If custody info doesn't exist in the cache,
/// try to fetch from the DB and prime the cache.
pub fn get_data_column_custody_info(&self) -> Result<Option<DataColumnCustodyInfo>, Error> {
let Some(data_column_custody_info) = self.block_cache.lock().get_data_column_custody_info()
else {
let data_column_custody_info = self
.blobs_db
.get::<DataColumnCustodyInfo>(&DATA_COLUMN_CUSTODY_INFO_KEY)?;
// Update the cache
self.block_cache
.lock()
.put_data_column_custody_info(data_column_custody_info.clone());
return Ok(data_column_custody_info);
};
Ok(Some(data_column_custody_info))
}
/// Fetch all columns for a given block from the store.
pub fn get_data_columns(
&self,

View File

@@ -264,6 +264,8 @@ pub enum DBColumn {
BeaconBlob,
#[strum(serialize = "bdc")]
BeaconDataColumn,
#[strum(serialize = "bdi")]
BeaconDataColumnCustodyInfo,
/// For full `BeaconState`s in the hot database (finalized or fork-boundary states).
///
/// DEPRECATED.
@@ -424,6 +426,7 @@ impl DBColumn {
| Self::CustodyContext
| Self::OptimisticTransitionBlock => 32,
Self::BeaconBlockRoots
| Self::BeaconDataColumnCustodyInfo
| Self::BeaconBlockRootsChunked
| Self::BeaconStateRoots
| Self::BeaconStateRootsChunked

View File

@@ -4,7 +4,7 @@ use ssz::{Decode, Encode};
use ssz_derive::{Decode, Encode};
use types::{Hash256, Slot};
pub const CURRENT_SCHEMA_VERSION: SchemaVersion = SchemaVersion(26);
pub const CURRENT_SCHEMA_VERSION: SchemaVersion = SchemaVersion(27);
// All the keys that get stored under the `BeaconMeta` column.
//
@@ -18,6 +18,7 @@ pub const COMPACTION_TIMESTAMP_KEY: Hash256 = Hash256::repeat_byte(4);
pub const ANCHOR_INFO_KEY: Hash256 = Hash256::repeat_byte(5);
pub const BLOB_INFO_KEY: Hash256 = Hash256::repeat_byte(6);
pub const DATA_COLUMN_INFO_KEY: Hash256 = Hash256::repeat_byte(7);
pub const DATA_COLUMN_CUSTODY_INFO_KEY: Hash256 = Hash256::repeat_byte(8);
/// State upper limit value used to indicate that a node is not storing historic states.
pub const STATE_UPPER_LIMIT_NO_RETAIN: Slot = Slot::new(u64::MAX);
@@ -204,6 +205,30 @@ impl StoreItem for BlobInfo {
}
}
/// Database parameter relevant to data column custody sync. There is only at most a single
/// `DataColumnCustodyInfo` stored in the db. `earliest_data_column_slot` is updated when cgc
/// count changes and is updated incrementally during data column custody backfill. Once custody backfill
/// is complete `earliest_data_column_slot` is set to `None`.
#[derive(Debug, PartialEq, Eq, Clone, Encode, Decode, Serialize, Deserialize, Default)]
pub struct DataColumnCustodyInfo {
/// The earliest slot for which data columns are available.
pub earliest_data_column_slot: Option<Slot>,
}
impl StoreItem for DataColumnCustodyInfo {
fn db_column() -> DBColumn {
DBColumn::BeaconDataColumnCustodyInfo
}
fn as_store_bytes(&self) -> Vec<u8> {
self.as_ssz_bytes()
}
fn from_store_bytes(bytes: &[u8]) -> Result<Self, Error> {
Ok(DataColumnCustodyInfo::from_ssz_bytes(bytes)?)
}
}
/// Database parameters relevant to data column sync.
#[derive(Debug, PartialEq, Eq, Clone, Encode, Decode, Serialize, Deserialize, Default)]
pub struct DataColumnInfo {