Persist only custody columns in db (#8188)

* Only persist custody columns

* Get claude to write tests

* lint

* Address review comments and fix tests.

* Use supernode only when building chain segments

* Clean up

* Rewrite tests.

* Fix tests

* Clippy

---------

Co-authored-by: Jimmy Chen <jchen.tc@gmail.com>
Co-authored-by: Michael Sproul <michael@sigmaprime.io>
This commit is contained in:
Pawan Dhananjay
2025-10-13 02:32:13 -07:00
committed by GitHub
parent 178df7a7d6
commit 2c328e32a6
4 changed files with 118 additions and 12 deletions

View File

@@ -3957,7 +3957,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
// See https://github.com/sigp/lighthouse/issues/2028 // See https://github.com/sigp/lighthouse/issues/2028
let (_, signed_block, block_data) = signed_block.deconstruct(); let (_, signed_block, block_data) = signed_block.deconstruct();
match self.get_blobs_or_columns_store_op(block_root, block_data) { match self.get_blobs_or_columns_store_op(block_root, signed_block.slot(), block_data) {
Ok(Some(blobs_or_columns_store_op)) => { Ok(Some(blobs_or_columns_store_op)) => {
ops.push(blobs_or_columns_store_op); ops.push(blobs_or_columns_store_op);
} }
@@ -7163,6 +7163,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
pub(crate) fn get_blobs_or_columns_store_op( pub(crate) fn get_blobs_or_columns_store_op(
&self, &self,
block_root: Hash256, block_root: Hash256,
block_slot: Slot,
block_data: AvailableBlockData<T::EthSpec>, block_data: AvailableBlockData<T::EthSpec>,
) -> Result<Option<StoreOp<'_, T::EthSpec>>, String> { ) -> Result<Option<StoreOp<'_, T::EthSpec>>, String> {
match block_data { match block_data {
@@ -7175,7 +7176,15 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
); );
Ok(Some(StoreOp::PutBlobs(block_root, blobs))) Ok(Some(StoreOp::PutBlobs(block_root, blobs)))
} }
AvailableBlockData::DataColumns(data_columns) => { AvailableBlockData::DataColumns(mut data_columns) => {
let columns_to_custody = self.custody_columns_for_epoch(Some(
block_slot.epoch(T::EthSpec::slots_per_epoch()),
));
// Supernodes need to persist all sampled custody columns
if columns_to_custody.len() != self.spec.number_of_custody_groups as usize {
data_columns
.retain(|data_column| columns_to_custody.contains(&data_column.index));
}
debug!( debug!(
%block_root, %block_root,
count = data_columns.len(), count = data_columns.len(),

View File

@@ -140,7 +140,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
// Store the blobs or data columns too // Store the blobs or data columns too
if let Some(op) = self if let Some(op) = self
.get_blobs_or_columns_store_op(block_root, block_data) .get_blobs_or_columns_store_op(block_root, block.slot(), block_data)
.map_err(|e| { .map_err(|e| {
HistoricalBlockError::StoreError(StoreError::DBError { HistoricalBlockError::StoreError(StoreError::DBError {
message: format!("get_blobs_or_columns_store_op error {e:?}"), message: format!("get_blobs_or_columns_store_op error {e:?}"),

View File

@@ -42,7 +42,10 @@ enum DataSidecars<E: EthSpec> {
} }
async fn get_chain_segment() -> (Vec<BeaconSnapshot<E>>, Vec<Option<DataSidecars<E>>>) { async fn get_chain_segment() -> (Vec<BeaconSnapshot<E>>, Vec<Option<DataSidecars<E>>>) {
let harness = get_harness(VALIDATOR_COUNT); // The assumption that you can re-import a block based on what you have in your DB
// is no longer true, as fullnodes stores less than what they sample.
// We use a supernode here to build a chain segment.
let harness = get_harness(VALIDATOR_COUNT, true);
harness harness
.extend_chain( .extend_chain(
@@ -101,7 +104,10 @@ async fn get_chain_segment() -> (Vec<BeaconSnapshot<E>>, Vec<Option<DataSidecars
(segment, segment_sidecars) (segment, segment_sidecars)
} }
fn get_harness(validator_count: usize) -> BeaconChainHarness<EphemeralHarnessType<E>> { fn get_harness(
validator_count: usize,
supernode: bool,
) -> BeaconChainHarness<EphemeralHarnessType<E>> {
let harness = BeaconChainHarness::builder(MainnetEthSpec) let harness = BeaconChainHarness::builder(MainnetEthSpec)
.default_spec() .default_spec()
.chain_config(ChainConfig { .chain_config(ChainConfig {
@@ -109,6 +115,7 @@ fn get_harness(validator_count: usize) -> BeaconChainHarness<EphemeralHarnessTyp
..ChainConfig::default() ..ChainConfig::default()
}) })
.keypairs(KEYPAIRS[0..validator_count].to_vec()) .keypairs(KEYPAIRS[0..validator_count].to_vec())
.import_all_data_columns(supernode)
.fresh_ephemeral_store() .fresh_ephemeral_store()
.mock_execution_layer() .mock_execution_layer()
.build(); .build();
@@ -252,7 +259,7 @@ fn update_data_column_signed_header<E: EthSpec>(
#[tokio::test] #[tokio::test]
async fn chain_segment_full_segment() { async fn chain_segment_full_segment() {
let harness = get_harness(VALIDATOR_COUNT); let harness = get_harness(VALIDATOR_COUNT, false);
let (chain_segment, chain_segment_blobs) = get_chain_segment().await; let (chain_segment, chain_segment_blobs) = get_chain_segment().await;
let blocks: Vec<RpcBlock<E>> = chain_segment_blocks(&chain_segment, &chain_segment_blobs) let blocks: Vec<RpcBlock<E>> = chain_segment_blocks(&chain_segment, &chain_segment_blobs)
.into_iter() .into_iter()
@@ -290,7 +297,7 @@ async fn chain_segment_full_segment() {
#[tokio::test] #[tokio::test]
async fn chain_segment_varying_chunk_size() { async fn chain_segment_varying_chunk_size() {
for chunk_size in &[1, 2, 3, 5, 31, 32, 33, 42] { for chunk_size in &[1, 2, 3, 5, 31, 32, 33, 42] {
let harness = get_harness(VALIDATOR_COUNT); let harness = get_harness(VALIDATOR_COUNT, false);
let (chain_segment, chain_segment_blobs) = get_chain_segment().await; let (chain_segment, chain_segment_blobs) = get_chain_segment().await;
let blocks: Vec<RpcBlock<E>> = chain_segment_blocks(&chain_segment, &chain_segment_blobs) let blocks: Vec<RpcBlock<E>> = chain_segment_blocks(&chain_segment, &chain_segment_blobs)
.into_iter() .into_iter()
@@ -322,7 +329,7 @@ async fn chain_segment_varying_chunk_size() {
#[tokio::test] #[tokio::test]
async fn chain_segment_non_linear_parent_roots() { async fn chain_segment_non_linear_parent_roots() {
let harness = get_harness(VALIDATOR_COUNT); let harness = get_harness(VALIDATOR_COUNT, false);
let (chain_segment, chain_segment_blobs) = get_chain_segment().await; let (chain_segment, chain_segment_blobs) = get_chain_segment().await;
harness harness
@@ -379,7 +386,7 @@ async fn chain_segment_non_linear_parent_roots() {
#[tokio::test] #[tokio::test]
async fn chain_segment_non_linear_slots() { async fn chain_segment_non_linear_slots() {
let harness = get_harness(VALIDATOR_COUNT); let harness = get_harness(VALIDATOR_COUNT, false);
let (chain_segment, chain_segment_blobs) = get_chain_segment().await; let (chain_segment, chain_segment_blobs) = get_chain_segment().await;
harness harness
.chain .chain
@@ -521,7 +528,7 @@ async fn assert_invalid_signature(
async fn get_invalid_sigs_harness( async fn get_invalid_sigs_harness(
chain_segment: &[BeaconSnapshot<E>], chain_segment: &[BeaconSnapshot<E>],
) -> BeaconChainHarness<EphemeralHarnessType<E>> { ) -> BeaconChainHarness<EphemeralHarnessType<E>> {
let harness = get_harness(VALIDATOR_COUNT); let harness = get_harness(VALIDATOR_COUNT, false);
harness harness
.chain .chain
.slot_clock .slot_clock
@@ -979,7 +986,7 @@ fn unwrap_err<T, U>(result: Result<T, U>) -> U {
#[tokio::test] #[tokio::test]
async fn block_gossip_verification() { async fn block_gossip_verification() {
let harness = get_harness(VALIDATOR_COUNT); let harness = get_harness(VALIDATOR_COUNT, false);
let (chain_segment, chain_segment_blobs) = get_chain_segment().await; let (chain_segment, chain_segment_blobs) = get_chain_segment().await;
let block_index = CHAIN_SEGMENT_LENGTH - 2; let block_index = CHAIN_SEGMENT_LENGTH - 2;
@@ -1382,7 +1389,7 @@ async fn verify_block_for_gossip_slashing_detection() {
#[tokio::test] #[tokio::test]
async fn verify_block_for_gossip_doppelganger_detection() { async fn verify_block_for_gossip_doppelganger_detection() {
let harness = get_harness(VALIDATOR_COUNT); let harness = get_harness(VALIDATOR_COUNT, false);
let state = harness.get_current_state(); let state = harness.get_current_state();
let ((block, _), _) = harness.make_block(state.clone(), Slot::new(1)).await; let ((block, _), _) = harness.make_block(state.clone(), Slot::new(1)).await;

View File

@@ -2735,6 +2735,14 @@ async fn weak_subjectivity_sync_test(
.rng(Box::new(StdRng::seed_from_u64(42))) .rng(Box::new(StdRng::seed_from_u64(42)))
.build() .build()
.expect("should build"); .expect("should build");
beacon_chain
.data_availability_checker
.custody_context()
.init_ordered_data_columns_from_custody_groups(
(0..spec.number_of_custody_groups).collect(),
&spec,
)
.unwrap();
let beacon_chain = Arc::new(beacon_chain); let beacon_chain = Arc::new(beacon_chain);
let wss_block_root = wss_block.canonical_root(); let wss_block_root = wss_block.canonical_root();
@@ -4137,6 +4145,88 @@ async fn replay_from_split_state() {
assert_eq!(state.slot(), split.slot); assert_eq!(state.slot(), split.slot);
} }
/// Test that regular nodes filter and store only custody columns when processing blocks with data columns.
#[tokio::test]
async fn test_custody_column_filtering_regular_node() {
// Skip test if PeerDAS is not scheduled
if !test_spec::<E>().is_peer_das_scheduled() {
return;
}
let db_path = tempdir().unwrap();
let store = get_store(&db_path);
let harness = get_harness(store.clone(), LOW_VALIDATOR_COUNT);
// Generate a block with data columns
harness.execution_block_generator().set_min_blob_count(1);
let current_slot = harness.get_current_slot();
let block_root = harness
.extend_chain(
1,
BlockStrategy::OnCanonicalHead,
AttestationStrategy::AllValidators,
)
.await;
// Get custody columns for this epoch - regular nodes only store a subset
let expected_custody_columns: HashSet<_> = harness
.chain
.custody_columns_for_epoch(Some(current_slot.epoch(E::slots_per_epoch())))
.iter()
.copied()
.collect();
// Check what actually got stored in the database
let stored_column_indices: HashSet<_> = store
.get_data_column_keys(block_root)
.expect("should get stored column keys")
.into_iter()
.collect();
assert_eq!(
stored_column_indices, expected_custody_columns,
"Regular node should only store custody columns"
);
}
/// Test that supernodes store all data columns when processing blocks with data columns.
#[tokio::test]
async fn test_custody_column_filtering_supernode() {
// Skip test if PeerDAS is not scheduled
if !test_spec::<E>().is_peer_das_scheduled() {
return;
}
let db_path = tempdir().unwrap();
let store = get_store(&db_path);
let harness = get_harness_import_all_data_columns(store.clone(), LOW_VALIDATOR_COUNT);
// Generate a block with data columns
harness.execution_block_generator().set_min_blob_count(1);
let block_root = harness
.extend_chain(
1,
BlockStrategy::OnCanonicalHead,
AttestationStrategy::AllValidators,
)
.await;
// Supernodes are expected to store all data columns
let expected_custody_columns: HashSet<_> = (0..E::number_of_columns() as u64).collect();
// Check what actually got stored in the database
let stored_column_indices: HashSet<_> = store
.get_data_column_keys(block_root)
.expect("should get stored column keys")
.into_iter()
.collect();
assert_eq!(
stored_column_indices, expected_custody_columns,
"Supernode should store all custody columns"
);
}
/// Checks that two chains are the same, for the purpose of these tests. /// Checks that two chains are the same, for the purpose of these tests.
/// ///
/// Several fields that are hard/impossible to check are ignored (e.g., the store). /// Several fields that are hard/impossible to check are ignored (e.g., the store).