Fix wrong columns getting processed on a CGC change (#7792)

This PR fixes a bug where wrong columns could get processed immediately after a CGC increase.

Scenario:
- The node's CGC increased due to additional validators attached to it (lets say from 10 to 11)
- The new CGC is advertised and new subnets are subscribed immediately, however the change won't be effective in the data availability check until the next epoch (See [this](ab0e8870b4/beacon_node/beacon_chain/src/validator_custody.rs (L93-L99))). Data availability checker still only require 10 columns for the current epoch.
- During this time, data columns for the additional custody column (lets say column 11) may arrive via gossip as we're already subscribed to the topic, and it may be incorrectly used to satisfy the existing data availability requirement (10 columns), and result in this additional column (instead of a required one) getting persisted, resulting in database inconsistency.
This commit is contained in:
Jimmy Chen
2025-08-07 10:45:04 +10:00
committed by GitHub
parent 9c972201bc
commit 8bc6693dac
27 changed files with 577 additions and 277 deletions

View File

@@ -279,7 +279,7 @@ lint:
# Lints the code using Clippy and automatically fix some simple compiler warnings.
lint-fix:
EXTRA_CLIPPY_OPTS="--fix --allow-staged --allow-dirty" $(MAKE) lint
EXTRA_CLIPPY_OPTS="--fix --allow-staged --allow-dirty" $(MAKE) lint-full
# Also run the lints on the optimized-only tests
lint-full:

View File

@@ -2206,7 +2206,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
pub fn verify_data_column_sidecar_for_gossip(
self: &Arc<Self>,
data_column_sidecar: Arc<DataColumnSidecar<T::EthSpec>>,
subnet_id: u64,
subnet_id: DataColumnSubnetId,
) -> Result<GossipVerifiedDataColumn<T>, GossipDataColumnError> {
metrics::inc_counter(&metrics::DATA_COLUMN_SIDECAR_PROCESSING_REQUESTS);
let _timer = metrics::start_timer(&metrics::DATA_COLUMN_SIDECAR_GOSSIP_VERIFICATION_TIMES);
@@ -3594,7 +3594,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
let availability = self
.data_availability_checker
.put_gossip_verified_data_columns(block_root, data_columns)?;
.put_gossip_verified_data_columns(block_root, slot, data_columns)?;
self.process_availability(slot, availability, publish_fn)
.await
@@ -3685,9 +3685,11 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
// This slot value is purely informative for the consumers of
// `AvailabilityProcessingStatus::MissingComponents` to log an error with a slot.
let availability = self
.data_availability_checker
.put_rpc_custody_columns(block_root, custody_columns)?;
let availability = self.data_availability_checker.put_rpc_custody_columns(
block_root,
slot,
custody_columns,
)?;
self.process_availability(slot, availability, || Ok(()))
.await
@@ -7110,6 +7112,14 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
roots.reverse();
roots
}
/// Returns a list of column indices that should be sampled for a given epoch.
/// Used for data availability sampling in PeerDAS.
pub fn sampling_columns_for_epoch(&self, epoch: Epoch) -> &[ColumnIndex] {
self.data_availability_checker
.custody_context()
.sampling_columns_for_epoch(epoch, &self.spec)
}
}
impl<T: BeaconChainTypes> Drop for BeaconChain<T> {

View File

@@ -20,7 +20,7 @@ use tracing::{debug, error, info_span, Instrument};
use types::blob_sidecar::{BlobIdentifier, BlobSidecar, FixedBlobSidecarList};
use types::{
BlobSidecarList, ChainSpec, DataColumnSidecar, DataColumnSidecarList, Epoch, EthSpec, Hash256,
RuntimeVariableList, SignedBeaconBlock,
RuntimeVariableList, SignedBeaconBlock, Slot,
};
mod error;
@@ -76,7 +76,7 @@ pub struct DataAvailabilityChecker<T: BeaconChainTypes> {
availability_cache: Arc<DataAvailabilityCheckerInner<T>>,
slot_clock: T::SlotClock,
kzg: Arc<Kzg>,
custody_context: Arc<CustodyContext>,
custody_context: Arc<CustodyContext<T::EthSpec>>,
spec: Arc<ChainSpec>,
}
@@ -114,7 +114,7 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
slot_clock: T::SlotClock,
kzg: Arc<Kzg>,
store: BeaconStore<T>,
custody_context: Arc<CustodyContext>,
custody_context: Arc<CustodyContext<T::EthSpec>>,
spec: Arc<ChainSpec>,
) -> Result<Self, AvailabilityCheckError> {
let inner = DataAvailabilityCheckerInner::new(
@@ -132,8 +132,8 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
})
}
pub fn custody_context(&self) -> Arc<CustodyContext> {
self.custody_context.clone()
pub fn custody_context(&self) -> &Arc<CustodyContext<T::EthSpec>> {
&self.custody_context
}
/// Checks if the block root is currenlty in the availability cache awaiting import because
@@ -233,6 +233,7 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
pub fn put_rpc_custody_columns(
&self,
block_root: Hash256,
slot: Slot,
custody_columns: DataColumnSidecarList<T::EthSpec>,
) -> Result<Availability<T::EthSpec>, AvailabilityCheckError> {
// Attributes fault to the specific peer that sent an invalid column
@@ -240,8 +241,17 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
KzgVerifiedDataColumn::from_batch_with_scoring(custody_columns, &self.kzg)
.map_err(AvailabilityCheckError::InvalidColumn)?;
// Filter out columns that aren't required for custody for this slot
// This is required because `data_columns_by_root` requests the **latest** CGC that _may_
// not be yet effective for data availability check, as CGC changes are only effecive from
// a new epoch.
let epoch = slot.epoch(T::EthSpec::slots_per_epoch());
let sampling_columns = self
.custody_context
.sampling_columns_for_epoch(epoch, &self.spec);
let verified_custody_columns = kzg_verified_columns
.into_iter()
.filter(|col| sampling_columns.contains(&col.index()))
.map(KzgVerifiedCustodyDataColumn::from_asserted_custody)
.collect::<Vec<_>>();
@@ -286,10 +296,16 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
>(
&self,
block_root: Hash256,
slot: Slot,
data_columns: I,
) -> Result<Availability<T::EthSpec>, AvailabilityCheckError> {
let epoch = slot.epoch(T::EthSpec::slots_per_epoch());
let sampling_columns = self
.custody_context
.sampling_columns_for_epoch(epoch, &self.spec);
let custody_columns = data_columns
.into_iter()
.filter(|col| sampling_columns.contains(&col.index()))
.map(|c| KzgVerifiedCustodyDataColumn::from_asserted_custody(c.into_inner()))
.collect::<Vec<_>>();
@@ -811,3 +827,207 @@ impl<E: EthSpec> MaybeAvailableBlock<E> {
}
}
}
#[cfg(test)]
mod test {
use super::*;
use crate::test_utils::{
generate_rand_block_and_data_columns, get_kzg, EphemeralHarnessType, NumBlobs,
};
use crate::CustodyContext;
use rand::prelude::StdRng;
use rand::seq::SliceRandom;
use rand::SeedableRng;
use slot_clock::{SlotClock, TestingSlotClock};
use std::collections::HashSet;
use std::sync::Arc;
use std::time::Duration;
use store::HotColdDB;
use types::{ChainSpec, ColumnIndex, EthSpec, ForkName, MainnetEthSpec, Slot};
type E = MainnetEthSpec;
type T = EphemeralHarnessType<E>;
/// Test to verify any extra RPC columns received that are not part of the "effective" CGC for
/// the slot are excluded from import.
#[test]
fn should_exclude_rpc_columns_not_required_for_sampling() {
// SETUP
let spec = Arc::new(ForkName::Fulu.make_genesis_spec(E::default_spec()));
let mut rng = StdRng::seed_from_u64(0xDEADBEEF0BAD5EEDu64);
let da_checker = new_da_checker(spec.clone());
let custody_context = &da_checker.custody_context;
let all_column_indices_ordered =
init_custody_context_with_ordered_columns(custody_context, &mut rng, &spec);
// GIVEN a single 32 ETH validator is attached slot 0
let epoch = Epoch::new(0);
let validator_0 = 0;
custody_context.register_validators(
vec![(validator_0, 32_000_000_000)],
epoch.start_slot(E::slots_per_epoch()),
&spec,
);
assert_eq!(
custody_context.num_of_data_columns_to_sample(epoch, &spec),
spec.validator_custody_requirement as usize,
"sampling size should be the minimal custody requirement == 8"
);
// WHEN additional attached validators result in a CGC increase to 10 at the end slot of the same epoch
let validator_1 = 1;
let cgc_change_slot = epoch.end_slot(E::slots_per_epoch());
custody_context.register_validators(
vec![(validator_1, 32_000_000_000 * 9)],
cgc_change_slot,
&spec,
);
// AND custody columns (8) and any new extra columns (2) are received via RPC responses.
// NOTE: block lookup uses the **latest** CGC (10) instead of the effective CGC (8) as the slot is unknown.
let (_, data_columns) = generate_rand_block_and_data_columns::<E>(
ForkName::Fulu,
NumBlobs::Number(1),
&mut rng,
&spec,
);
let block_root = Hash256::random();
let requested_columns = &all_column_indices_ordered[..10];
da_checker
.put_rpc_custody_columns(
block_root,
cgc_change_slot,
data_columns
.into_iter()
.filter(|d| requested_columns.contains(&d.index))
.collect(),
)
.expect("should put rpc custody columns");
// THEN the sampling size for the end slot of the same epoch remains unchanged
let sampling_columns = custody_context.sampling_columns_for_epoch(epoch, &spec);
assert_eq!(
sampling_columns.len(),
spec.validator_custody_requirement as usize // 8
);
// AND any extra columns received via RPC responses are excluded from import.
let actual_cached: HashSet<ColumnIndex> = da_checker
.cached_data_column_indexes(&block_root)
.expect("should have cached data columns")
.into_iter()
.collect();
let expected_sampling_columns = sampling_columns.iter().copied().collect::<HashSet<_>>();
assert_eq!(
actual_cached, expected_sampling_columns,
"should cache only the effective sampling columns"
);
assert!(
actual_cached.len() < requested_columns.len(),
"extra columns should be excluded"
)
}
/// Test to verify any extra gossip columns received that are not part of the "effective" CGC for
/// the slot are excluded from import.
#[test]
fn should_exclude_gossip_columns_not_required_for_sampling() {
// SETUP
let spec = Arc::new(ForkName::Fulu.make_genesis_spec(E::default_spec()));
let mut rng = StdRng::seed_from_u64(0xDEADBEEF0BAD5EEDu64);
let da_checker = new_da_checker(spec.clone());
let custody_context = &da_checker.custody_context;
let all_column_indices_ordered =
init_custody_context_with_ordered_columns(custody_context, &mut rng, &spec);
// GIVEN a single 32 ETH validator is attached slot 0
let epoch = Epoch::new(0);
let validator_0 = 0;
custody_context.register_validators(
vec![(validator_0, 32_000_000_000)],
epoch.start_slot(E::slots_per_epoch()),
&spec,
);
assert_eq!(
custody_context.num_of_data_columns_to_sample(epoch, &spec),
spec.validator_custody_requirement as usize,
"sampling size should be the minimal custody requirement == 8"
);
// WHEN additional attached validators result in a CGC increase to 10 at the end slot of the same epoch
let validator_1 = 1;
let cgc_change_slot = epoch.end_slot(E::slots_per_epoch());
custody_context.register_validators(
vec![(validator_1, 32_000_000_000 * 9)],
cgc_change_slot,
&spec,
);
// AND custody columns (8) and any new extra columns (2) are received via gossip.
// NOTE: CGC updates results in new topics subscriptions immediately, and extra columns may start to
// arrive via gossip.
let (_, data_columns) = generate_rand_block_and_data_columns::<E>(
ForkName::Fulu,
NumBlobs::Number(1),
&mut rng,
&spec,
);
let block_root = Hash256::random();
let requested_columns = &all_column_indices_ordered[..10];
let gossip_columns = data_columns
.into_iter()
.filter(|d| requested_columns.contains(&d.index))
.map(GossipVerifiedDataColumn::<T>::__new_for_testing)
.collect::<Vec<_>>();
da_checker
.put_gossip_verified_data_columns(block_root, cgc_change_slot, gossip_columns)
.expect("should put gossip custody columns");
// THEN the sampling size for the end slot of the same epoch remains unchanged
let sampling_columns = custody_context.sampling_columns_for_epoch(epoch, &spec);
assert_eq!(
sampling_columns.len(),
spec.validator_custody_requirement as usize // 8
);
// AND any extra columns received via gossip responses are excluded from import.
let actual_cached: HashSet<ColumnIndex> = da_checker
.cached_data_column_indexes(&block_root)
.expect("should have cached data columns")
.into_iter()
.collect();
let expected_sampling_columns = sampling_columns.iter().copied().collect::<HashSet<_>>();
assert_eq!(
actual_cached, expected_sampling_columns,
"should cache only the effective sampling columns"
);
assert!(
actual_cached.len() < requested_columns.len(),
"extra columns should be excluded"
)
}
fn init_custody_context_with_ordered_columns(
custody_context: &Arc<CustodyContext<E>>,
mut rng: &mut StdRng,
spec: &ChainSpec,
) -> Vec<u64> {
let mut all_data_columns = (0..spec.number_of_custody_groups).collect::<Vec<_>>();
all_data_columns.shuffle(&mut rng);
custody_context
.init_ordered_data_columns_from_custody_groups(all_data_columns.clone(), spec)
.expect("should initialise ordered custody columns");
all_data_columns
}
fn new_da_checker(spec: Arc<ChainSpec>) -> DataAvailabilityChecker<T> {
let slot_clock = TestingSlotClock::new(
Slot::new(0),
Duration::from_secs(0),
Duration::from_secs(spec.seconds_per_slot),
);
let kzg = get_kzg(&spec);
let store = Arc::new(HotColdDB::open_ephemeral(<_>::default(), spec.clone()).unwrap());
let custody_context = Arc::new(CustodyContext::new(false));
DataAvailabilityChecker::new(slot_clock, kzg, store, custody_context, spec)
.expect("should initialise data availability checker")
}
}

View File

@@ -159,7 +159,7 @@ impl<E: EthSpec> PendingComponents<E> {
pub fn make_available<R>(
&mut self,
spec: &Arc<ChainSpec>,
num_expected_columns: u64,
num_expected_columns: usize,
recover: R,
) -> Result<Option<AvailableExecutedBlock<E>>, AvailabilityCheckError>
where
@@ -173,7 +173,6 @@ impl<E: EthSpec> PendingComponents<E> {
};
let num_expected_blobs = block.num_blobs_expected();
let num_expected_columns = num_expected_columns as usize;
let blob_data = if num_expected_blobs == 0 {
Some(AvailableBlockData::NoData)
} else if spec.is_peer_das_enabled_for_epoch(block.epoch()) {
@@ -311,7 +310,7 @@ impl<E: EthSpec> PendingComponents<E> {
pub fn status_str(
&self,
block_epoch: Epoch,
num_expected_columns: Option<u64>,
num_expected_columns: Option<usize>,
spec: &ChainSpec,
) -> String {
let block_count = if self.executed_block.is_some() { 1 } else { 0 };
@@ -348,7 +347,7 @@ pub struct DataAvailabilityCheckerInner<T: BeaconChainTypes> {
/// This cache holds a limited number of states in memory and reconstructs them
/// from disk when necessary. This is necessary until we merge tree-states
state_cache: StateLRUCache<T>,
custody_context: Arc<CustodyContext>,
custody_context: Arc<CustodyContext<T::EthSpec>>,
spec: Arc<ChainSpec>,
}
@@ -365,7 +364,7 @@ impl<T: BeaconChainTypes> DataAvailabilityCheckerInner<T> {
pub fn new(
capacity: NonZeroUsize,
beacon_store: BeaconStore<T>,
custody_context: Arc<CustodyContext>,
custody_context: Arc<CustodyContext<T::EthSpec>>,
spec: Arc<ChainSpec>,
) -> Result<Self, AvailabilityCheckError> {
Ok(Self {
@@ -482,7 +481,7 @@ impl<T: BeaconChainTypes> DataAvailabilityCheckerInner<T> {
if let Some(available_block) = pending_components.make_available(
&self.spec,
self.custody_context
.num_of_data_columns_to_sample(Some(epoch), &self.spec),
.num_of_data_columns_to_sample(epoch, &self.spec),
|block| self.state_cache.recover_pending_executed_block(block),
)? {
// We keep the pending components in the availability cache during block import (#5845).
@@ -508,10 +507,11 @@ impl<T: BeaconChainTypes> DataAvailabilityCheckerInner<T> {
.peek()
.map(|verified_blob| verified_blob.as_data_column().epoch())
else {
// Verified data_columns list should be non-empty.
return Err(AvailabilityCheckError::Unexpected(
"empty columns".to_owned(),
));
// No columns are processed. This can occur if all received columns were filtered out
// before this point, e.g. due to a CGC change that caused extra columns to be downloaded
// // before the new CGC took effect.
// Return `Ok` without marking the block as available.
return Ok(Availability::MissingComponents(block_root));
};
let mut write_lock = self.critical.write();
@@ -529,7 +529,7 @@ impl<T: BeaconChainTypes> DataAvailabilityCheckerInner<T> {
let num_expected_columns = self
.custody_context
.num_of_data_columns_to_sample(Some(epoch), &self.spec);
.num_of_data_columns_to_sample(epoch, &self.spec);
debug!(
component = "data_columns",
?block_root,
@@ -627,7 +627,7 @@ impl<T: BeaconChainTypes> DataAvailabilityCheckerInner<T> {
let num_expected_columns = self
.custody_context
.num_of_data_columns_to_sample(Some(epoch), &self.spec);
.num_of_data_columns_to_sample(epoch, &self.spec);
debug!(
component = "block",
?block_root,

View File

@@ -198,7 +198,7 @@ impl<T: BeaconChainTypes, O: ObservationStrategy> Clone for GossipVerifiedDataCo
impl<T: BeaconChainTypes, O: ObservationStrategy> GossipVerifiedDataColumn<T, O> {
pub fn new(
column_sidecar: Arc<DataColumnSidecar<T::EthSpec>>,
subnet_id: u64,
subnet_id: DataColumnSubnetId,
chain: &BeaconChain<T>,
) -> Result<Self, GossipDataColumnError> {
let header = column_sidecar.signed_block_header.clone();
@@ -472,7 +472,7 @@ where
pub fn validate_data_column_sidecar_for_gossip<T: BeaconChainTypes, O: ObservationStrategy>(
data_column: Arc<DataColumnSidecar<T::EthSpec>>,
subnet: u64,
subnet: DataColumnSubnetId,
chain: &BeaconChain<T>,
) -> Result<GossipVerifiedDataColumn<T, O>, GossipDataColumnError> {
let column_slot = data_column.slot();
@@ -735,15 +735,14 @@ fn verify_proposer_and_signature<T: BeaconChainTypes>(
fn verify_index_matches_subnet<E: EthSpec>(
data_column: &DataColumnSidecar<E>,
subnet: u64,
subnet: DataColumnSubnetId,
spec: &ChainSpec,
) -> Result<(), GossipDataColumnError> {
let expected_subnet: u64 =
DataColumnSubnetId::from_column_index(data_column.index, spec).into();
let expected_subnet = DataColumnSubnetId::from_column_index(data_column.index, spec);
if expected_subnet != subnet {
return Err(GossipDataColumnError::InvalidSubnetId {
received: subnet,
expected: expected_subnet,
received: subnet.into(),
expected: expected_subnet.into(),
});
}
Ok(())
@@ -821,7 +820,7 @@ mod test {
};
use crate::observed_data_sidecars::Observe;
use crate::test_utils::BeaconChainHarness;
use types::{DataColumnSidecar, EthSpec, ForkName, MainnetEthSpec};
use types::{DataColumnSidecar, DataColumnSubnetId, EthSpec, ForkName, MainnetEthSpec};
type E = MainnetEthSpec;
@@ -860,7 +859,7 @@ mod test {
let result = validate_data_column_sidecar_for_gossip::<_, Observe>(
column_sidecar.into(),
index,
DataColumnSubnetId::from_column_index(index, &harness.spec),
&harness.chain,
);
assert!(matches!(

View File

@@ -31,7 +31,6 @@ use metrics::{inc_counter, TryExt};
use mockall_double::double;
use ssz_types::FixedVector;
use state_processing::per_block_processing::deneb::kzg_commitment_to_versioned_hash;
use std::collections::HashSet;
use std::sync::Arc;
use tracing::{debug, warn};
use types::blob_sidecar::BlobSidecarError;
@@ -73,7 +72,7 @@ pub async fn fetch_and_process_engine_blobs<T: BeaconChainTypes>(
chain: Arc<BeaconChain<T>>,
block_root: Hash256,
block: Arc<SignedBeaconBlock<T::EthSpec, FullPayload<T::EthSpec>>>,
custody_columns: HashSet<ColumnIndex>,
custody_columns: &[ColumnIndex],
publish_fn: impl Fn(EngineGetBlobsOutput<T>) + Send + 'static,
) -> Result<Option<AvailabilityProcessingStatus>, FetchEngineBlobError> {
fetch_and_process_engine_blobs_inner(
@@ -92,7 +91,7 @@ async fn fetch_and_process_engine_blobs_inner<T: BeaconChainTypes>(
chain_adapter: FetchBlobsBeaconAdapter<T>,
block_root: Hash256,
block: Arc<SignedBeaconBlock<T::EthSpec, FullPayload<T::EthSpec>>>,
custody_columns: HashSet<ColumnIndex>,
custody_columns: &[ColumnIndex],
publish_fn: impl Fn(EngineGetBlobsOutput<T>) + Send + 'static,
) -> Result<Option<AvailabilityProcessingStatus>, FetchEngineBlobError> {
let versioned_hashes = if let Some(kzg_commitments) = block
@@ -238,7 +237,7 @@ async fn fetch_and_process_blobs_v2<T: BeaconChainTypes>(
block_root: Hash256,
block: Arc<SignedBeaconBlock<T::EthSpec>>,
versioned_hashes: Vec<VersionedHash>,
custody_columns_indices: HashSet<ColumnIndex>,
custody_columns_indices: &[ColumnIndex],
publish_fn: impl Fn(EngineGetBlobsOutput<T>) + Send + 'static,
) -> Result<Option<AvailabilityProcessingStatus>, FetchEngineBlobError> {
let num_expected_blobs = versioned_hashes.len();
@@ -337,11 +336,12 @@ async fn compute_custody_columns_to_import<T: BeaconChainTypes>(
block: Arc<SignedBeaconBlock<T::EthSpec, FullPayload<T::EthSpec>>>,
blobs: Vec<Blob<T::EthSpec>>,
proofs: Vec<KzgProofs<T::EthSpec>>,
custody_columns_indices: HashSet<ColumnIndex>,
custody_columns_indices: &[ColumnIndex],
) -> Result<Vec<KzgVerifiedCustodyDataColumn<T::EthSpec>>, FetchEngineBlobError> {
let kzg = chain_adapter.kzg().clone();
let spec = chain_adapter.spec().clone();
let chain_adapter_cloned = chain_adapter.clone();
let custody_columns_indices = custody_columns_indices.to_vec();
chain_adapter
.executor()
.spawn_blocking_handle(

View File

@@ -21,6 +21,7 @@ type T = EphemeralHarnessType<E>;
mod get_blobs_v2 {
use super::*;
use types::ColumnIndex;
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_fetch_blobs_v2_no_blobs_in_block() {
@@ -36,12 +37,12 @@ mod get_blobs_v2 {
mock_adapter.expect_get_blobs_v2().times(0);
mock_adapter.expect_process_engine_blobs().times(0);
let custody_columns = hashset![0, 1, 2];
let custody_columns: [ColumnIndex; 3] = [0, 1, 2];
let processing_status = fetch_and_process_engine_blobs_inner(
mock_adapter,
block_root,
Arc::new(block),
custody_columns.clone(),
&custody_columns,
publish_fn,
)
.await
@@ -61,12 +62,12 @@ mod get_blobs_v2 {
mock_get_blobs_v2_response(&mut mock_adapter, None);
// Trigger fetch blobs on the block
let custody_columns = hashset![0, 1, 2];
let custody_columns: [ColumnIndex; 3] = [0, 1, 2];
let processing_status = fetch_and_process_engine_blobs_inner(
mock_adapter,
block_root,
block,
custody_columns.clone(),
&custody_columns,
publish_fn,
)
.await
@@ -89,12 +90,12 @@ mod get_blobs_v2 {
mock_adapter.expect_process_engine_blobs().times(0);
// Trigger fetch blobs on the block
let custody_columns = hashset![0, 1, 2];
let custody_columns: [ColumnIndex; 3] = [0, 1, 2];
let processing_status = fetch_and_process_engine_blobs_inner(
mock_adapter,
block_root,
block,
custody_columns.clone(),
&custody_columns,
publish_fn,
)
.await
@@ -122,12 +123,12 @@ mod get_blobs_v2 {
mock_adapter.expect_process_engine_blobs().times(0);
// Trigger fetch blobs on the block
let custody_columns = hashset![0, 1, 2];
let custody_columns: [ColumnIndex; 3] = [0, 1, 2];
let processing_status = fetch_and_process_engine_blobs_inner(
mock_adapter,
block_root,
block,
custody_columns.clone(),
&custody_columns,
publish_fn,
)
.await
@@ -161,12 +162,12 @@ mod get_blobs_v2 {
mock_adapter.expect_process_engine_blobs().times(0);
// **WHEN**: Trigger `fetch_blobs` on the block
let custody_columns = hashset![0, 1, 2];
let custody_columns: [ColumnIndex; 3] = [0, 1, 2];
let processing_status = fetch_and_process_engine_blobs_inner(
mock_adapter,
block_root,
block,
custody_columns.clone(),
&custody_columns,
publish_fn,
)
.await
@@ -203,12 +204,12 @@ mod get_blobs_v2 {
);
// Trigger fetch blobs on the block
let custody_columns = hashset![0, 1, 2];
let custody_columns: [ColumnIndex; 3] = [0, 1, 2];
let processing_status = fetch_and_process_engine_blobs_inner(
mock_adapter,
block_root,
block,
custody_columns.clone(),
&custody_columns,
publish_fn,
)
.await
@@ -252,6 +253,7 @@ mod get_blobs_v1 {
use super::*;
use crate::block_verification_types::AsBlock;
use std::collections::HashSet;
use types::ColumnIndex;
const ELECTRA_FORK: ForkName = ForkName::Electra;
@@ -268,12 +270,12 @@ mod get_blobs_v1 {
mock_adapter.expect_get_blobs_v1().times(0);
// WHEN: Trigger fetch blobs on the block
let custody_columns = hashset![0, 1, 2];
let custody_columns: [ColumnIndex; 3] = [0, 1, 2];
let processing_status = fetch_and_process_engine_blobs_inner(
mock_adapter,
block_root,
Arc::new(block_no_blobs),
custody_columns,
&custody_columns,
publish_fn,
)
.await
@@ -295,12 +297,12 @@ mod get_blobs_v1 {
mock_get_blobs_v1_response(&mut mock_adapter, vec![None; expected_blob_count]);
// WHEN: Trigger fetch blobs on the block
let custody_columns = hashset![0, 1, 2];
let custody_columns: [ColumnIndex; 3] = [0, 1, 2];
let processing_status = fetch_and_process_engine_blobs_inner(
mock_adapter,
block_root,
block,
custody_columns,
&custody_columns,
publish_fn,
)
.await
@@ -341,12 +343,12 @@ mod get_blobs_v1 {
);
// WHEN: Trigger fetch blobs on the block
let custody_columns = hashset![0, 1, 2];
let custody_columns: [ColumnIndex; 3] = [0, 1, 2];
let processing_status = fetch_and_process_engine_blobs_inner(
mock_adapter,
block_root,
block,
custody_columns,
&custody_columns,
publish_fn,
)
.await
@@ -381,12 +383,12 @@ mod get_blobs_v1 {
mock_fork_choice_contains_block(&mut mock_adapter, vec![block.canonical_root()]);
// WHEN: Trigger fetch blobs on the block
let custody_columns = hashset![0, 1, 2];
let custody_columns: [ColumnIndex; 3] = [0, 1, 2];
let processing_status = fetch_and_process_engine_blobs_inner(
mock_adapter,
block_root,
block,
custody_columns,
&custody_columns,
publish_fn,
)
.await
@@ -429,12 +431,12 @@ mod get_blobs_v1 {
.returning(move |_, _| Some(all_blob_indices.clone()));
// **WHEN**: Trigger `fetch_blobs` on the block
let custody_columns = hashset![0, 1, 2];
let custody_columns: [ColumnIndex; 3] = [0, 1, 2];
let processing_status = fetch_and_process_engine_blobs_inner(
mock_adapter,
block_root,
block,
custody_columns,
&custody_columns,
publish_fn,
)
.await
@@ -473,12 +475,12 @@ mod get_blobs_v1 {
);
// Trigger fetch blobs on the block
let custody_columns = hashset![0, 1, 2];
let custody_columns: [ColumnIndex; 3] = [0, 1, 2];
let processing_status = fetch_and_process_engine_blobs_inner(
mock_adapter,
block_root,
block,
custody_columns,
&custody_columns,
publish_fn,
)
.await

View File

@@ -606,6 +606,15 @@ where
let chain = builder.build().expect("should build");
chain
.data_availability_checker
.custody_context()
.init_ordered_data_columns_from_custody_groups(
(0..spec.number_of_custody_groups).collect(),
&spec,
)
.expect("should initialise custody context");
BeaconChainHarness {
spec: chain.spec.clone(),
chain: Arc::new(chain),
@@ -773,13 +782,6 @@ where
(0..self.validator_keypairs.len()).collect()
}
pub fn get_sampling_column_count(&self) -> usize {
self.chain
.data_availability_checker
.custody_context()
.num_of_data_columns_to_sample(None, &self.chain.spec) as usize
}
pub fn slots_per_epoch(&self) -> u64 {
E::slots_per_epoch()
}
@@ -2385,7 +2387,8 @@ where
blob_items: Option<(KzgProofs<E>, BlobsList<E>)>,
) -> Result<RpcBlock<E>, BlockError> {
Ok(if self.spec.is_peer_das_enabled_for_epoch(block.epoch()) {
let sampling_column_count = self.get_sampling_column_count();
let epoch = block.slot().epoch(E::slots_per_epoch());
let sampling_columns = self.chain.sampling_columns_for_epoch(epoch);
if blob_items.is_some_and(|(_, blobs)| !blobs.is_empty()) {
// Note: this method ignores the actual custody columns and just take the first
@@ -2393,7 +2396,7 @@ where
// currently have any knowledge of the columns being custodied.
let columns = generate_data_column_sidecars_from_block(&block, &self.spec)
.into_iter()
.take(sampling_column_count)
.filter(|d| sampling_columns.contains(&d.index))
.map(CustodyDataColumn::from_asserted_custody)
.collect::<Vec<_>>();
RpcBlock::new_with_custody_columns(Some(block_root), block, columns, &self.spec)?
@@ -3123,17 +3126,22 @@ where
let is_peerdas_enabled = self.chain.spec.is_peer_das_enabled_for_epoch(block.epoch());
if is_peerdas_enabled {
let custody_columns = custody_columns_opt.unwrap_or_else(|| {
let sampling_column_count = self.get_sampling_column_count() as u64;
(0..sampling_column_count).collect()
let epoch = block.slot().epoch(E::slots_per_epoch());
self.chain
.sampling_columns_for_epoch(epoch)
.iter()
.copied()
.collect()
});
let verified_columns = generate_data_column_sidecars_from_block(block, &self.spec)
.into_iter()
.filter(|c| custody_columns.contains(&c.index))
.map(|sidecar| {
let column_index = sidecar.index;
let subnet_id =
DataColumnSubnetId::from_column_index(sidecar.index, &self.spec);
self.chain
.verify_data_column_sidecar_for_gossip(sidecar, column_index)
.verify_data_column_sidecar_for_gossip(sidecar, subnet_id)
})
.collect::<Result<Vec<_>, _>>()
.unwrap();

View File

@@ -1,12 +1,13 @@
use parking_lot::RwLock;
use ssz_derive::{Decode, Encode};
use std::marker::PhantomData;
use std::sync::OnceLock;
use std::{
collections::{BTreeMap, HashMap},
sync::atomic::{AtomicU64, Ordering},
};
use parking_lot::RwLock;
use ssz_derive::{Decode, Encode};
use types::{ChainSpec, Epoch, EthSpec, Slot};
use types::data_column_custody_group::{compute_columns_for_custody_group, CustodyIndex};
use types::{ChainSpec, ColumnIndex, Epoch, EthSpec, Slot};
/// A delay before making the CGC change effective to the data availability checker.
const CUSTODY_CHANGE_DA_EFFECTIVE_DELAY_SECONDS: u64 = 30;
@@ -120,7 +121,7 @@ fn get_validators_custody_requirement(validator_custody_units: u64, spec: &Chain
/// Contains all the information the node requires to calculate the
/// number of columns to be custodied when checking for DA.
#[derive(Debug)]
pub struct CustodyContext {
pub struct CustodyContext<E: EthSpec> {
/// The Number of custody groups required based on the number of validators
/// that is attached to this node.
///
@@ -139,9 +140,13 @@ pub struct CustodyContext {
persisted_is_supernode: bool,
/// Maintains all the validators that this node is connected to currently
validator_registrations: RwLock<ValidatorRegistrations>,
/// Stores an immutable, ordered list of all custody columns as determined by the node's NodeID
/// on startup.
all_custody_columns_ordered: OnceLock<Box<[ColumnIndex]>>,
_phantom_data: PhantomData<E>,
}
impl CustodyContext {
impl<E: EthSpec> CustodyContext<E> {
/// Create a new custody default custody context object when no persisted object
/// exists.
///
@@ -152,6 +157,8 @@ impl CustodyContext {
current_is_supernode: is_supernode,
persisted_is_supernode: is_supernode,
validator_registrations: Default::default(),
all_custody_columns_ordered: OnceLock::new(),
_phantom_data: PhantomData,
}
}
@@ -170,16 +177,44 @@ impl CustodyContext {
.into_iter()
.collect(),
}),
all_custody_columns_ordered: OnceLock::new(),
_phantom_data: PhantomData,
}
}
/// Initializes an ordered list of data columns based on provided custody groups.
///
/// # Arguments
/// * `all_custody_groups_ordered` - Vector of custody group indices to map to columns
/// * `spec` - Chain specification containing custody parameters
///
/// # Returns
/// Ok(()) if initialization succeeds, Err with description string if it fails
pub fn init_ordered_data_columns_from_custody_groups(
&self,
all_custody_groups_ordered: Vec<CustodyIndex>,
spec: &ChainSpec,
) -> Result<(), String> {
let mut ordered_custody_columns = vec![];
for custody_index in all_custody_groups_ordered {
let columns = compute_columns_for_custody_group(custody_index, spec)
.map_err(|e| format!("Failed to compute columns for custody group {e:?}"))?;
ordered_custody_columns.extend(columns);
}
self.all_custody_columns_ordered
.set(ordered_custody_columns.into_boxed_slice())
.map_err(|_| {
"Failed to initialise CustodyContext with computed custody columns".to_string()
})
}
/// Register a new validator index and updates the list of validators if required.
///
/// Also modifies the internal structures if the validator custody has changed to
/// update the `custody_column_count`.
///
/// Returns `Some` along with the updated custody group count if it has changed, otherwise returns `None`.
pub fn register_validators<E: EthSpec>(
pub fn register_validators(
&self,
validators_and_balance: ValidatorsAndBalances,
current_slot: Slot,
@@ -215,8 +250,7 @@ impl CustodyContext {
);
return Some(CustodyCountChanged {
new_custody_group_count: updated_cgc,
sampling_count: self
.num_of_custody_groups_to_sample(Some(effective_epoch), spec),
sampling_count: self.num_of_custody_groups_to_sample(effective_epoch, spec),
effective_epoch,
});
}
@@ -248,39 +282,48 @@ impl CustodyContext {
/// minimum sampling size which may exceed the custody group count (CGC).
///
/// See also: [`Self::num_of_custody_groups_to_sample`].
fn custody_group_count_at_epoch(&self, epoch_opt: Option<Epoch>, spec: &ChainSpec) -> u64 {
fn custody_group_count_at_epoch(&self, epoch: Epoch, spec: &ChainSpec) -> u64 {
let custody_group_count = if self.current_is_supernode {
spec.number_of_custody_groups
} else if let Some(epoch) = epoch_opt {
} else {
self.validator_registrations
.read()
.custody_requirement_at_epoch(epoch)
.unwrap_or(spec.custody_requirement)
} else {
self.custody_group_count_at_head(spec)
};
custody_group_count
}
/// Returns the count of custody groups this node must _sample_ for a block at `epoch` to import.
/// If an `epoch` is not specified, returns the *current* validator custody requirement.
pub fn num_of_custody_groups_to_sample(
&self,
epoch_opt: Option<Epoch>,
spec: &ChainSpec,
) -> u64 {
let custody_group_count = self.custody_group_count_at_epoch(epoch_opt, spec);
pub fn num_of_custody_groups_to_sample(&self, epoch: Epoch, spec: &ChainSpec) -> u64 {
let custody_group_count = self.custody_group_count_at_epoch(epoch, spec);
spec.sampling_size_custody_groups(custody_group_count)
.expect("should compute node sampling size from valid chain spec")
}
/// Returns the count of columns this node must _sample_ for a block at `epoch` to import.
/// If an `epoch` is not specified, returns the *current* validator custody requirement.
pub fn num_of_data_columns_to_sample(&self, epoch_opt: Option<Epoch>, spec: &ChainSpec) -> u64 {
let custody_group_count = self.custody_group_count_at_epoch(epoch_opt, spec);
pub fn num_of_data_columns_to_sample(&self, epoch: Epoch, spec: &ChainSpec) -> usize {
let custody_group_count = self.custody_group_count_at_epoch(epoch, spec);
spec.sampling_size_columns(custody_group_count)
.expect("should compute node sampling size from valid chain spec")
}
/// Returns the ordered list of column indices that should be sampled for data availability checking at the given epoch.
///
/// # Parameters
/// * `epoch` - Epoch to determine sampling columns for
/// * `spec` - Chain specification containing sampling parameters
///
/// # Returns
/// A slice of ordered column indices that should be sampled for this epoch based on the node's custody configuration
pub fn sampling_columns_for_epoch(&self, epoch: Epoch, spec: &ChainSpec) -> &[ColumnIndex] {
let num_of_columns_to_sample = self.num_of_data_columns_to_sample(epoch, spec);
let all_columns_ordered = self
.all_custody_columns_ordered
.get()
.expect("all_custody_columns_ordered should be initialized");
&all_columns_ordered[..num_of_columns_to_sample]
}
}
/// The custody count changed because of a change in the
@@ -299,8 +342,8 @@ pub struct CustodyContextSsz {
pub epoch_validator_custody_requirements: Vec<(Epoch, u64)>,
}
impl From<&CustodyContext> for CustodyContextSsz {
fn from(context: &CustodyContext) -> Self {
impl<E: EthSpec> From<&CustodyContext<E>> for CustodyContextSsz {
fn from(context: &CustodyContext<E>) -> Self {
CustodyContextSsz {
validator_custody_at_head: context.validator_custody_count.load(Ordering::Relaxed),
persisted_is_supernode: context.persisted_is_supernode,
@@ -317,6 +360,8 @@ impl From<&CustodyContext> for CustodyContextSsz {
#[cfg(test)]
mod tests {
use rand::seq::SliceRandom;
use rand::thread_rng;
use types::MainnetEthSpec;
use super::*;
@@ -325,21 +370,21 @@ mod tests {
#[test]
fn no_validators_supernode_default() {
let custody_context = CustodyContext::new(true);
let custody_context = CustodyContext::<E>::new(true);
let spec = E::default_spec();
assert_eq!(
custody_context.custody_group_count_at_head(&spec),
spec.number_of_custody_groups
);
assert_eq!(
custody_context.num_of_custody_groups_to_sample(None, &spec),
custody_context.num_of_custody_groups_to_sample(Epoch::new(0), &spec),
spec.number_of_custody_groups
);
}
#[test]
fn no_validators_fullnode_default() {
let custody_context = CustodyContext::new(false);
let custody_context = CustodyContext::<E>::new(false);
let spec = E::default_spec();
assert_eq!(
custody_context.custody_group_count_at_head(&spec),
@@ -347,14 +392,14 @@ mod tests {
"head custody count should be minimum spec custody requirement"
);
assert_eq!(
custody_context.num_of_custody_groups_to_sample(None, &spec),
custody_context.num_of_custody_groups_to_sample(Epoch::new(0), &spec),
spec.samples_per_slot
);
}
#[test]
fn register_single_validator_should_update_cgc() {
let custody_context = CustodyContext::new(false);
let custody_context = CustodyContext::<E>::new(false);
let spec = E::default_spec();
let bal_per_additional_group = spec.balance_per_additional_custody_group;
let min_val_custody_requirement = spec.validator_custody_requirement;
@@ -369,7 +414,7 @@ mod tests {
(vec![(0, 10 * bal_per_additional_group)], Some(10)),
];
register_validators_and_assert_cgc(
register_validators_and_assert_cgc::<E>(
&custody_context,
validators_and_expected_cgc_change,
&spec,
@@ -378,7 +423,7 @@ mod tests {
#[test]
fn register_multiple_validators_should_update_cgc() {
let custody_context = CustodyContext::new(false);
let custody_context = CustodyContext::<E>::new(false);
let spec = E::default_spec();
let bal_per_additional_group = spec.balance_per_additional_custody_group;
let min_val_custody_requirement = spec.validator_custody_requirement;
@@ -406,12 +451,16 @@ mod tests {
),
];
register_validators_and_assert_cgc(&custody_context, validators_and_expected_cgc, &spec);
register_validators_and_assert_cgc::<E>(
&custody_context,
validators_and_expected_cgc,
&spec,
);
}
#[test]
fn register_validators_should_not_update_cgc_for_supernode() {
let custody_context = CustodyContext::new(true);
let custody_context = CustodyContext::<E>::new(true);
let spec = E::default_spec();
let bal_per_additional_group = spec.balance_per_additional_custody_group;
@@ -435,23 +484,29 @@ mod tests {
),
];
register_validators_and_assert_cgc(&custody_context, validators_and_expected_cgc, &spec);
register_validators_and_assert_cgc::<E>(
&custody_context,
validators_and_expected_cgc,
&spec,
);
let current_epoch = Epoch::new(2);
assert_eq!(
custody_context.num_of_custody_groups_to_sample(None, &spec),
custody_context.num_of_custody_groups_to_sample(current_epoch, &spec),
spec.number_of_custody_groups
);
}
#[test]
fn cgc_change_should_be_effective_to_sampling_after_delay() {
let custody_context = CustodyContext::new(false);
let custody_context = CustodyContext::<E>::new(false);
let spec = E::default_spec();
let current_slot = Slot::new(10);
let current_epoch = current_slot.epoch(E::slots_per_epoch());
let default_sampling_size = custody_context.num_of_custody_groups_to_sample(None, &spec);
let default_sampling_size =
custody_context.num_of_custody_groups_to_sample(current_epoch, &spec);
let validator_custody_units = 10;
let _cgc_changed = custody_context.register_validators::<E>(
let _cgc_changed = custody_context.register_validators(
vec![(
0,
validator_custody_units * spec.balance_per_additional_custody_group,
@@ -462,26 +517,26 @@ mod tests {
// CGC update is not applied for `current_epoch`.
assert_eq!(
custody_context.num_of_custody_groups_to_sample(Some(current_epoch), &spec),
custody_context.num_of_custody_groups_to_sample(current_epoch, &spec),
default_sampling_size
);
// CGC update is applied for the next epoch.
assert_eq!(
custody_context.num_of_custody_groups_to_sample(Some(current_epoch + 1), &spec),
custody_context.num_of_custody_groups_to_sample(current_epoch + 1, &spec),
validator_custody_units
);
}
#[test]
fn validator_dropped_after_no_registrations_within_expiry_should_not_reduce_cgc() {
let custody_context = CustodyContext::new(false);
let custody_context = CustodyContext::<E>::new(false);
let spec = E::default_spec();
let current_slot = Slot::new(10);
let val_custody_units_1 = 10;
let val_custody_units_2 = 5;
// GIVEN val_1 and val_2 registered at `current_slot`
let _ = custody_context.register_validators::<E>(
let _ = custody_context.register_validators(
vec![
(
1,
@@ -497,7 +552,7 @@ mod tests {
);
// WHEN val_1 re-registered, but val_2 did not re-register after `VALIDATOR_REGISTRATION_EXPIRY_SLOTS + 1` slots
let cgc_changed_opt = custody_context.register_validators::<E>(
let cgc_changed_opt = custody_context.register_validators(
vec![(
1,
val_custody_units_1 * spec.balance_per_additional_custody_group,
@@ -516,7 +571,7 @@ mod tests {
#[test]
fn validator_dropped_after_no_registrations_within_expiry() {
let custody_context = CustodyContext::new(false);
let custody_context = CustodyContext::<E>::new(false);
let spec = E::default_spec();
let current_slot = Slot::new(10);
let val_custody_units_1 = 10;
@@ -524,7 +579,7 @@ mod tests {
let val_custody_units_3 = 6;
// GIVEN val_1 and val_2 registered at `current_slot`
let _ = custody_context.register_validators::<E>(
let _ = custody_context.register_validators(
vec![
(
1,
@@ -540,7 +595,7 @@ mod tests {
);
// WHEN val_1 and val_3 registered, but val_3 did not re-register after `VALIDATOR_REGISTRATION_EXPIRY_SLOTS + 1` slots
let cgc_changed = custody_context.register_validators::<E>(
let cgc_changed = custody_context.register_validators(
vec![
(
1,
@@ -564,9 +619,40 @@ mod tests {
);
}
/// Update validator every epoch and assert cgc against expected values.
fn register_validators_and_assert_cgc(
custody_context: &CustodyContext,
#[test]
fn should_init_ordered_data_columns_and_return_sampling_columns() {
let spec = E::default_spec();
let custody_context = CustodyContext::<E>::new(false);
let sampling_size = custody_context.num_of_data_columns_to_sample(Epoch::new(0), &spec);
// initialise ordered columns
let mut all_custody_groups_ordered = (0..spec.number_of_custody_groups).collect::<Vec<_>>();
all_custody_groups_ordered.shuffle(&mut thread_rng());
custody_context
.init_ordered_data_columns_from_custody_groups(
all_custody_groups_ordered.clone(),
&spec,
)
.expect("should initialise ordered data columns");
let actual_sampling_columns =
custody_context.sampling_columns_for_epoch(Epoch::new(0), &spec);
let expected_sampling_columns = &all_custody_groups_ordered
.iter()
.flat_map(|custody_index| {
compute_columns_for_custody_group(*custody_index, &spec)
.expect("should compute columns for custody group")
})
.collect::<Vec<_>>()[0..sampling_size];
assert_eq!(actual_sampling_columns, expected_sampling_columns)
}
/// Update the validator every epoch and assert cgc against expected values.
fn register_validators_and_assert_cgc<E: EthSpec>(
custody_context: &CustodyContext<E>,
validators_and_expected_cgc_changed: Vec<(ValidatorsAndBalances, Option<u64>)>,
spec: &ChainSpec,
) {
@@ -575,7 +661,7 @@ mod tests {
{
let epoch = Epoch::new(idx as u64);
let updated_custody_count_opt = custody_context
.register_validators::<E>(
.register_validators(
validators_and_balance,
epoch.start_slot(E::slots_per_epoch()),
spec,

View File

@@ -1311,7 +1311,7 @@ async fn verify_and_process_gossip_data_sidecars(
);
harness.chain.verify_data_column_sidecar_for_gossip(
column_sidecar.into_inner(),
*subnet_id,
subnet_id,
)
})
.collect::<Result<Vec<_>, _>>()

View File

@@ -8,7 +8,7 @@ use std::sync::Arc;
use types::blob_sidecar::FixedBlobSidecarList;
use types::test_utils::TestRandom;
use types::{
BlobSidecar, DataColumnSidecar, EthSpec, ForkName, MinimalEthSpec, RuntimeVariableList,
BlobSidecar, DataColumnSidecar, EthSpec, ForkName, MinimalEthSpec, RuntimeVariableList, Slot,
};
type E = MinimalEthSpec;
@@ -64,8 +64,17 @@ async fn data_column_sidecar_event_on_process_gossip_data_column() {
// build and process a gossip verified data column
let mut rng = StdRng::seed_from_u64(0xDEADBEEF0BAD5EEDu64);
let sidecar = Arc::new(DataColumnSidecar::random_for_test(&mut rng));
let gossip_verified_data_column = GossipVerifiedDataColumn::__new_for_testing(sidecar);
let sidecar = {
// DA checker only accepts sampling columns, so we need to create one with a sampling index.
let mut random_sidecar = DataColumnSidecar::random_for_test(&mut rng);
let slot = Slot::new(10);
let epoch = slot.epoch(E::slots_per_epoch());
random_sidecar.signed_block_header.message.slot = slot;
random_sidecar.index = harness.chain.sampling_columns_for_epoch(epoch)[0];
random_sidecar
};
let gossip_verified_data_column =
GossipVerifiedDataColumn::__new_for_testing(Arc::new(sidecar));
let expected_sse_data_column = SseDataColumnSidecar::from_data_column_sidecar(
gossip_verified_data_column.as_data_column(),
);

View File

@@ -42,6 +42,7 @@ use std::time::{SystemTime, UNIX_EPOCH};
use store::database::interface::BeaconNodeBackend;
use timer::spawn_timer;
use tracing::{debug, info, warn};
use types::data_column_custody_group::get_custody_groups_ordered;
use types::{
test_utils::generate_deterministic_keypairs, BeaconState, BlobSidecarList, ChainSpec, EthSpec,
ExecutionBlockHash, Hash256, SignedBeaconBlock,
@@ -477,7 +478,7 @@ where
};
let (network_globals, network_senders) = NetworkService::start(
beacon_chain,
beacon_chain.clone(),
config,
context.executor,
libp2p_registry.as_mut(),
@@ -486,6 +487,8 @@ where
.await
.map_err(|e| format!("Failed to start network: {:?}", e))?;
init_custody_context(beacon_chain, &network_globals)?;
self.network_globals = Some(network_globals);
self.network_senders = Some(network_senders);
self.libp2p_registry = libp2p_registry;
@@ -787,6 +790,21 @@ where
}
}
fn init_custody_context<T: BeaconChainTypes>(
chain: Arc<BeaconChain<T>>,
network_globals: &NetworkGlobals<T::EthSpec>,
) -> Result<(), String> {
let node_id = network_globals.local_enr().node_id().raw();
let spec = &chain.spec;
let custody_groups_ordered =
get_custody_groups_ordered(node_id, spec.number_of_custody_groups, spec)
.map_err(|e| format!("Failed to compute custody groups: {:?}", e))?;
chain
.data_availability_checker
.custody_context()
.init_ordered_data_columns_from_custody_groups(custody_groups_ordered, spec)
}
impl<TSlotClock, E, THotStore, TColdStore>
ClientBuilder<Witness<TSlotClock, E, THotStore, TColdStore>>
where

View File

@@ -1433,14 +1433,12 @@ pub fn serve<T: BeaconChainTypes>(
.and(task_spawner_filter.clone())
.and(chain_filter.clone())
.and(network_tx_filter.clone())
.and(network_globals.clone())
.then(
move |value: serde_json::Value,
consensus_version: ForkName,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>,
network_globals: Arc<NetworkGlobals<T::EthSpec>>| {
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>| {
task_spawner.spawn_async_with_rejection(Priority::P0, async move {
let request = PublishBlockRequest::<T::EthSpec>::context_deserialize(
&value,
@@ -1456,7 +1454,6 @@ pub fn serve<T: BeaconChainTypes>(
&network_tx,
BroadcastValidation::default(),
duplicate_block_status_code,
network_globals,
)
.await
})
@@ -1472,14 +1469,12 @@ pub fn serve<T: BeaconChainTypes>(
.and(task_spawner_filter.clone())
.and(chain_filter.clone())
.and(network_tx_filter.clone())
.and(network_globals.clone())
.then(
move |block_bytes: Bytes,
consensus_version: ForkName,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>,
network_globals: Arc<NetworkGlobals<T::EthSpec>>| {
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>| {
task_spawner.spawn_async_with_rejection(Priority::P0, async move {
let block_contents = PublishBlockRequest::<T::EthSpec>::from_ssz_bytes(
&block_bytes,
@@ -1495,7 +1490,6 @@ pub fn serve<T: BeaconChainTypes>(
&network_tx,
BroadcastValidation::default(),
duplicate_block_status_code,
network_globals,
)
.await
})
@@ -1512,15 +1506,13 @@ pub fn serve<T: BeaconChainTypes>(
.and(task_spawner_filter.clone())
.and(chain_filter.clone())
.and(network_tx_filter.clone())
.and(network_globals.clone())
.then(
move |validation_level: api_types::BroadcastValidationQuery,
value: serde_json::Value,
consensus_version: ForkName,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>,
network_globals: Arc<NetworkGlobals<T::EthSpec>>| {
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>| {
task_spawner.spawn_async_with_rejection(Priority::P0, async move {
let request = PublishBlockRequest::<T::EthSpec>::context_deserialize(
&value,
@@ -1537,7 +1529,6 @@ pub fn serve<T: BeaconChainTypes>(
&network_tx,
validation_level.broadcast_validation,
duplicate_block_status_code,
network_globals,
)
.await
})
@@ -1554,15 +1545,13 @@ pub fn serve<T: BeaconChainTypes>(
.and(task_spawner_filter.clone())
.and(chain_filter.clone())
.and(network_tx_filter.clone())
.and(network_globals.clone())
.then(
move |validation_level: api_types::BroadcastValidationQuery,
block_bytes: Bytes,
consensus_version: ForkName,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>,
network_globals: Arc<NetworkGlobals<T::EthSpec>>| {
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>| {
task_spawner.spawn_async_with_rejection(Priority::P0, async move {
let block_contents = PublishBlockRequest::<T::EthSpec>::from_ssz_bytes(
&block_bytes,
@@ -1578,7 +1567,6 @@ pub fn serve<T: BeaconChainTypes>(
&network_tx,
validation_level.broadcast_validation,
duplicate_block_status_code,
network_globals,
)
.await
})
@@ -1598,13 +1586,11 @@ pub fn serve<T: BeaconChainTypes>(
.and(task_spawner_filter.clone())
.and(chain_filter.clone())
.and(network_tx_filter.clone())
.and(network_globals.clone())
.then(
move |block_contents: Arc<SignedBlindedBeaconBlock<T::EthSpec>>,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>,
network_globals: Arc<NetworkGlobals<T::EthSpec>>| {
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>| {
task_spawner.spawn_async_with_rejection(Priority::P0, async move {
publish_blocks::publish_blinded_block(
block_contents,
@@ -1612,7 +1598,6 @@ pub fn serve<T: BeaconChainTypes>(
&network_tx,
BroadcastValidation::default(),
duplicate_block_status_code,
network_globals,
)
.await
})
@@ -1628,13 +1613,11 @@ pub fn serve<T: BeaconChainTypes>(
.and(task_spawner_filter.clone())
.and(chain_filter.clone())
.and(network_tx_filter.clone())
.and(network_globals.clone())
.then(
move |block_bytes: Bytes,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>,
network_globals: Arc<NetworkGlobals<T::EthSpec>>| {
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>| {
task_spawner.spawn_async_with_rejection(Priority::P0, async move {
let block = SignedBlindedBeaconBlock::<T::EthSpec>::from_ssz_bytes(
&block_bytes,
@@ -1650,7 +1633,6 @@ pub fn serve<T: BeaconChainTypes>(
&network_tx,
BroadcastValidation::default(),
duplicate_block_status_code,
network_globals,
)
.await
})
@@ -1666,14 +1648,12 @@ pub fn serve<T: BeaconChainTypes>(
.and(task_spawner_filter.clone())
.and(chain_filter.clone())
.and(network_tx_filter.clone())
.and(network_globals.clone())
.then(
move |validation_level: api_types::BroadcastValidationQuery,
blinded_block: Arc<SignedBlindedBeaconBlock<T::EthSpec>>,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>,
network_globals: Arc<NetworkGlobals<T::EthSpec>>| {
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>| {
task_spawner.spawn_async_with_rejection(Priority::P0, async move {
publish_blocks::publish_blinded_block(
blinded_block,
@@ -1681,7 +1661,6 @@ pub fn serve<T: BeaconChainTypes>(
&network_tx,
validation_level.broadcast_validation,
duplicate_block_status_code,
network_globals,
)
.await
})
@@ -1697,14 +1676,12 @@ pub fn serve<T: BeaconChainTypes>(
.and(task_spawner_filter.clone())
.and(chain_filter.clone())
.and(network_tx_filter.clone())
.and(network_globals.clone())
.then(
move |validation_level: api_types::BroadcastValidationQuery,
block_bytes: Bytes,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>,
network_globals: Arc<NetworkGlobals<T::EthSpec>>| {
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>| {
task_spawner.spawn_async_with_rejection(Priority::P0, async move {
let block = SignedBlindedBeaconBlock::<T::EthSpec>::from_ssz_bytes(
&block_bytes,
@@ -1720,7 +1697,6 @@ pub fn serve<T: BeaconChainTypes>(
&network_tx,
validation_level.broadcast_validation,
duplicate_block_status_code,
network_globals,
)
.await
})
@@ -3839,11 +3815,8 @@ pub fn serve<T: BeaconChainTypes>(
if let Some(cgc_change) = chain
.data_availability_checker
.custody_context()
.register_validators::<T::EthSpec>(
validators_and_balances,
current_slot,
&chain.spec,
) {
.register_validators(validators_and_balances, current_slot, &chain.spec)
{
chain.update_data_column_custody_info(Some(
cgc_change
.effective_epoch

View File

@@ -15,7 +15,7 @@ use eth2::types::{
};
use execution_layer::{ProvenancedPayload, SubmitBlindedBlockResponse};
use futures::TryFutureExt;
use lighthouse_network::{NetworkGlobals, PubsubMessage};
use lighthouse_network::PubsubMessage;
use network::NetworkMessage;
use rand::prelude::SliceRandom;
use slot_clock::SlotClock;
@@ -82,7 +82,6 @@ pub async fn publish_block<T: BeaconChainTypes, B: IntoGossipVerifiedBlock<T>>(
network_tx: &UnboundedSender<NetworkMessage<T::EthSpec>>,
validation_level: BroadcastValidation,
duplicate_status_code: StatusCode,
network_globals: Arc<NetworkGlobals<T::EthSpec>>,
) -> Result<Response, Rejection> {
let seen_timestamp = timestamp_now();
let block_publishing_delay_for_testing = chain.config.block_publishing_delay;
@@ -223,7 +222,8 @@ pub async fn publish_block<T: BeaconChainTypes, B: IntoGossipVerifiedBlock<T>>(
publish_column_sidecars(network_tx, &gossip_verified_columns, &chain).map_err(|_| {
warp_utils::reject::custom_server_error("unable to publish data column sidecars".into())
})?;
let sampling_columns_indices = &network_globals.sampling_columns();
let epoch = block.slot().epoch(T::EthSpec::slots_per_epoch());
let sampling_columns_indices = chain.sampling_columns_for_epoch(epoch);
let sampling_columns = gossip_verified_columns
.into_iter()
.flatten()
@@ -405,7 +405,7 @@ fn build_gossip_verified_data_columns<T: BeaconChainTypes>(
let column_index = data_column_sidecar.index;
let subnet = DataColumnSubnetId::from_column_index(column_index, &chain.spec);
let gossip_verified_column =
GossipVerifiedDataColumn::new(data_column_sidecar, subnet.into(), chain);
GossipVerifiedDataColumn::new(data_column_sidecar, subnet, chain);
match gossip_verified_column {
Ok(blob) => Ok(Some(blob)),
@@ -633,7 +633,6 @@ pub async fn publish_blinded_block<T: BeaconChainTypes>(
network_tx: &UnboundedSender<NetworkMessage<T::EthSpec>>,
validation_level: BroadcastValidation,
duplicate_status_code: StatusCode,
network_globals: Arc<NetworkGlobals<T::EthSpec>>,
) -> Result<Response, Rejection> {
let block_root = blinded_block.canonical_root();
let full_block_opt = reconstruct_block(chain.clone(), block_root, blinded_block).await?;
@@ -646,7 +645,6 @@ pub async fn publish_blinded_block<T: BeaconChainTypes>(
network_tx,
validation_level,
duplicate_status_code,
network_globals,
)
.await
} else {

View File

@@ -372,7 +372,6 @@ pub async fn consensus_partial_pass_only_consensus() {
/* submit `block_b` which should induce equivocation */
let channel = tokio::sync::mpsc::unbounded_channel();
let network_globals = tester.ctx.network_globals.clone().unwrap();
let publication_result = publish_block(
None,
@@ -381,7 +380,6 @@ pub async fn consensus_partial_pass_only_consensus() {
&channel.0,
validation_level,
StatusCode::ACCEPTED,
network_globals,
)
.await;
@@ -663,7 +661,6 @@ pub async fn equivocation_consensus_late_equivocation() {
assert!(gossip_block_a.is_err());
let channel = tokio::sync::mpsc::unbounded_channel();
let network_globals = tester.ctx.network_globals.clone().unwrap();
let publication_result = publish_block(
None,
@@ -672,7 +669,6 @@ pub async fn equivocation_consensus_late_equivocation() {
&channel.0,
validation_level,
StatusCode::ACCEPTED,
network_globals,
)
.await;
@@ -1302,7 +1298,6 @@ pub async fn blinded_equivocation_consensus_late_equivocation() {
assert!(gossip_block_a.is_err());
let channel = tokio::sync::mpsc::unbounded_channel();
let network_globals = tester.ctx.network_globals.clone().unwrap();
let publication_result = publish_blinded_block(
block_b,
@@ -1310,7 +1305,6 @@ pub async fn blinded_equivocation_consensus_late_equivocation() {
&channel.0,
validation_level,
StatusCode::ACCEPTED,
network_globals,
)
.await;
@@ -1487,7 +1481,7 @@ pub async fn block_seen_on_gossip_with_some_blobs_or_columns() {
&block,
partial_blobs.iter(),
partial_kzg_proofs.iter(),
Some(get_custody_columns(&tester)),
Some(get_custody_columns(&tester, block.slot())),
)
.await;
@@ -1558,7 +1552,7 @@ pub async fn blobs_or_columns_seen_on_gossip_without_block() {
&block,
blobs.iter(),
kzg_proofs.iter(),
Some(get_custody_columns(&tester)),
Some(get_custody_columns(&tester, block.slot())),
)
.await;
@@ -1629,7 +1623,7 @@ async fn blobs_or_columns_seen_on_gossip_without_block_and_no_http_blobs_or_colu
&block,
blobs.iter(),
kzg_proofs.iter(),
Some(get_custody_columns(&tester)),
Some(get_custody_columns(&tester, block.slot())),
)
.await;
@@ -1703,7 +1697,7 @@ async fn slashable_blobs_or_columns_seen_on_gossip_cause_failure() {
&block_b,
blobs_b.iter(),
kzg_proofs_b.iter(),
Some(get_custody_columns(&tester)),
Some(get_custody_columns(&tester, block_b.slot())),
)
.await;
@@ -1802,11 +1796,15 @@ fn assert_server_message_error(error_response: eth2::Error, expected_message: St
assert_eq!(err.message, expected_message);
}
fn get_custody_columns(tester: &InteractiveTester<E>) -> HashSet<ColumnIndex> {
fn get_custody_columns(tester: &InteractiveTester<E>, slot: Slot) -> HashSet<ColumnIndex> {
let epoch = slot.epoch(E::slots_per_epoch());
tester
.ctx
.network_globals
.chain
.as_ref()
.unwrap()
.sampling_columns()
.sampling_columns_for_epoch(epoch)
.iter()
.copied()
.collect()
}

View File

@@ -898,9 +898,9 @@ impl<E: EthSpec> Network<E> {
name = "libp2p",
skip_all
)]
pub fn subscribe_new_data_column_subnets(&mut self, custody_column_count: u64) {
pub fn subscribe_new_data_column_subnets(&mut self, sampling_column_count: u64) {
self.network_globals
.update_data_column_subnets(custody_column_count);
.update_data_column_subnets(sampling_column_count);
for column in self.network_globals.sampling_subnets() {
let kind = GossipKind::DataColumnSidecar(column);

View File

@@ -8,9 +8,7 @@ use parking_lot::RwLock;
use std::collections::HashSet;
use std::sync::Arc;
use tracing::error;
use types::data_column_custody_group::{
compute_columns_for_custody_group, compute_subnets_from_custody_group, get_custody_groups,
};
use types::data_column_custody_group::{compute_subnets_from_custody_group, get_custody_groups};
use types::{ChainSpec, ColumnIndex, DataColumnSubnetId, EthSpec};
pub struct NetworkGlobals<E: EthSpec> {
@@ -32,7 +30,6 @@ pub struct NetworkGlobals<E: EthSpec> {
pub backfill_state: RwLock<BackFillState>,
/// The computed sampling subnets and columns is stored to avoid re-computing.
pub sampling_subnets: RwLock<HashSet<DataColumnSubnetId>>,
pub sampling_columns: RwLock<HashSet<ColumnIndex>>,
/// Network-related configuration. Immutable after initialization.
pub config: Arc<NetworkConfig>,
/// Ethereum chain configuration. Immutable after initialization.
@@ -78,16 +75,8 @@ impl<E: EthSpec> NetworkGlobals<E> {
sampling_subnets.extend(subnets);
}
let mut sampling_columns = HashSet::new();
for custody_index in &custody_groups {
let columns = compute_columns_for_custody_group(*custody_index, &spec)
.expect("should compute custody columns for node");
sampling_columns.extend(columns);
}
tracing::debug!(
cgc = custody_group_count,
?sampling_columns,
?sampling_subnets,
"Starting node with custody params"
);
@@ -102,20 +91,15 @@ impl<E: EthSpec> NetworkGlobals<E> {
sync_state: RwLock::new(SyncState::Stalled),
backfill_state: RwLock::new(BackFillState::Paused),
sampling_subnets: RwLock::new(sampling_subnets),
sampling_columns: RwLock::new(sampling_columns),
config,
spec,
}
}
/// Update the sampling subnets based on an updated cgc.
pub fn update_data_column_subnets(&self, custody_group_count: u64) {
pub fn update_data_column_subnets(&self, sampling_size: u64) {
// The below `expect` calls will panic on start up if the chain spec config values used
// are invalid
let sampling_size = self
.spec
.sampling_size_custody_groups(custody_group_count)
.expect("should compute node sampling size from valid chain spec");
let custody_groups =
get_custody_groups(self.local_enr().node_id().raw(), sampling_size, &self.spec)
.expect("should compute node custody groups");
@@ -126,13 +110,6 @@ impl<E: EthSpec> NetworkGlobals<E> {
.expect("should compute custody subnets for node");
sampling_subnets.extend(subnets);
}
let mut sampling_columns = self.sampling_columns.write();
for custody_index in &custody_groups {
let columns = compute_columns_for_custody_group(*custody_index, &self.spec)
.expect("should compute custody columns for node");
sampling_columns.extend(columns);
}
}
/// Returns the local ENR from the underlying Discv5 behaviour that external peers may connect
@@ -248,10 +225,6 @@ impl<E: EthSpec> NetworkGlobals<E> {
}
}
pub fn sampling_columns(&self) -> HashSet<ColumnIndex> {
self.sampling_columns.read().clone()
}
pub fn sampling_subnets(&self) -> HashSet<DataColumnSubnetId> {
self.sampling_subnets.read().clone()
}
@@ -320,29 +293,6 @@ mod test {
);
}
#[test]
fn test_sampling_columns() {
create_test_tracing_subscriber();
let mut spec = E::default_spec();
spec.fulu_fork_epoch = Some(Epoch::new(0));
let custody_group_count = spec.number_of_custody_groups / 2;
let expected_sampling_columns = spec.sampling_size_columns(custody_group_count).unwrap();
let metadata = get_metadata(custody_group_count);
let config = Arc::new(NetworkConfig::default());
let globals = NetworkGlobals::<E>::new_test_globals_with_metadata(
vec![],
metadata,
config,
Arc::new(spec),
);
assert_eq!(
globals.sampling_columns.read().len(),
expected_sampling_columns as usize
);
}
fn get_metadata(custody_group_count: u64) -> MetaData<E> {
MetaData::V3(MetaDataV3 {
seq_number: 0,

View File

@@ -188,8 +188,8 @@ impl std::fmt::Display for GossipKind {
GossipKind::BlobSidecar(blob_index) => {
write!(f, "{}{}", BLOB_SIDECAR_PREFIX, blob_index)
}
GossipKind::DataColumnSidecar(column_index) => {
write!(f, "{}{}", DATA_COLUMN_SIDECAR_PREFIX, **column_index)
GossipKind::DataColumnSidecar(column_subnet_id) => {
write!(f, "{}{}", DATA_COLUMN_SIDECAR_PREFIX, **column_subnet_id)
}
x => f.write_str(x.as_ref()),
}
@@ -317,8 +317,8 @@ impl std::fmt::Display for GossipTopic {
GossipKind::BlobSidecar(blob_index) => {
format!("{}{}", BLOB_SIDECAR_PREFIX, blob_index)
}
GossipKind::DataColumnSidecar(index) => {
format!("{}{}", DATA_COLUMN_SIDECAR_PREFIX, *index)
GossipKind::DataColumnSidecar(column_subnet_id) => {
format!("{}{}", DATA_COLUMN_SIDECAR_PREFIX, *column_subnet_id)
}
GossipKind::BlsToExecutionChange => BLS_TO_EXECUTION_CHANGE_TOPIC.into(),
GossipKind::LightClientFinalityUpdate => LIGHT_CLIENT_FINALITY_UPDATE.into(),

View File

@@ -607,7 +607,6 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
self: &Arc<Self>,
message_id: MessageId,
peer_id: PeerId,
_peer_client: Client,
subnet_id: DataColumnSubnetId,
column_sidecar: Arc<DataColumnSidecar<T::EthSpec>>,
seen_duration: Duration,
@@ -623,7 +622,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
);
match self
.chain
.verify_data_column_sidecar_for_gossip(column_sidecar.clone(), *subnet_id)
.verify_data_column_sidecar_for_gossip(column_sidecar.clone(), subnet_id)
{
Ok(gossip_verified_data_column) => {
metrics::inc_counter(
@@ -650,6 +649,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
duration,
);
}
self.process_gossip_verified_data_column(
peer_id,
gossip_verified_data_column,

View File

@@ -227,7 +227,6 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
self: &Arc<Self>,
message_id: MessageId,
peer_id: PeerId,
peer_client: Client,
subnet_id: DataColumnSubnetId,
column_sidecar: Arc<DataColumnSidecar<T::EthSpec>>,
seen_timestamp: Duration,
@@ -238,7 +237,6 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
.process_gossip_data_column_sidecar(
message_id,
peer_id,
peer_client,
subnet_id,
column_sidecar,
seen_timestamp,
@@ -753,7 +751,8 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
block_root: Hash256,
publish_blobs: bool,
) {
let custody_columns = self.network_globals.sampling_columns();
let epoch = block.slot().epoch(T::EthSpec::slots_per_epoch());
let custody_columns = self.chain.sampling_columns_for_epoch(epoch);
let self_cloned = self.clone();
let publish_fn = move |blobs_or_data_column| {
if publish_blobs {

View File

@@ -37,8 +37,9 @@ use tokio::sync::mpsc;
use types::blob_sidecar::FixedBlobSidecarList;
use types::{
AttesterSlashing, BlobSidecar, BlobSidecarList, ChainSpec, DataColumnSidecarList,
DataColumnSubnetId, Epoch, Hash256, MainnetEthSpec, ProposerSlashing, SignedAggregateAndProof,
SignedBeaconBlock, SignedVoluntaryExit, SingleAttestation, Slot, SubnetId,
DataColumnSubnetId, Epoch, EthSpec, Hash256, MainnetEthSpec, ProposerSlashing,
SignedAggregateAndProof, SignedBeaconBlock, SignedVoluntaryExit, SingleAttestation, Slot,
SubnetId,
};
type E = MainnetEthSpec;
@@ -271,6 +272,8 @@ impl TestRig {
let (blob_sidecars, data_columns) = if let Some((kzg_proofs, blobs)) = next_block_tuple.1 {
if chain.spec.is_peer_das_enabled_for_epoch(block.epoch()) {
let kzg = get_kzg(&chain.spec);
let epoch = block.slot().epoch(E::slots_per_epoch());
let sampling_indices = chain.sampling_columns_for_epoch(epoch);
let custody_columns: DataColumnSidecarList<E> = blobs_to_data_column_sidecars(
&blobs.iter().collect_vec(),
kzg_proofs.clone().into_iter().collect_vec(),
@@ -280,7 +283,7 @@ impl TestRig {
)
.unwrap()
.into_iter()
.filter(|c| network_globals.sampling_columns().contains(&c.index))
.filter(|c| sampling_indices.contains(&c.index))
.collect::<Vec<_>>();
(None, Some(custody_columns))
@@ -357,7 +360,6 @@ impl TestRig {
.send_gossip_data_column_sidecar(
junk_message_id(),
junk_peer_id(),
Client::default(),
DataColumnSubnetId::from_column_index(data_column.index, &self.chain.spec),
data_column.clone(),
Duration::from_secs(0),
@@ -807,7 +809,8 @@ async fn accept_processed_gossip_data_columns_without_import() {
.unwrap()
.into_iter()
.map(|data_column| {
let subnet_id = data_column.index;
let subnet_id =
DataColumnSubnetId::from_column_index(data_column.index, &rig.chain.spec);
validate_data_column_sidecar_for_gossip::<_, DoNotObserve>(
data_column,
subnet_id,
@@ -820,7 +823,7 @@ async fn accept_processed_gossip_data_columns_without_import() {
let block_root = rig.next_block.canonical_root();
rig.chain
.data_availability_checker
.put_gossip_verified_data_columns(block_root, verified_data_columns)
.put_gossip_verified_data_columns(block_root, rig.next_block.slot(), verified_data_columns)
.expect("should put data columns into availability cache");
// WHEN an already processed but unobserved data column is received via gossip

View File

@@ -379,7 +379,6 @@ impl<T: BeaconChainTypes> Router<T> {
.send_gossip_data_column_sidecar(
message_id,
peer_id,
self.network_globals.client(&peer_id),
subnet_id,
column_sidecar,
timestamp_now(),

View File

@@ -551,7 +551,13 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
// Attempt to find all required custody peers before sending any request or creating an ID
let columns_by_range_peers_to_request =
if matches!(batch_type, ByRangeRequestType::BlocksAndColumns) {
let column_indexes = self.network_globals().sampling_columns();
let epoch = Slot::new(*request.start_slot()).epoch(T::EthSpec::slots_per_epoch());
let column_indexes = self
.chain
.sampling_columns_for_epoch(epoch)
.iter()
.cloned()
.collect();
Some(self.select_columns_by_range_peers_to_request(
&column_indexes,
peers,
@@ -602,18 +608,14 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
})
.transpose()?;
let epoch = Slot::new(*request.start_slot()).epoch(T::EthSpec::slots_per_epoch());
let info = RangeBlockComponentsRequest::new(
blocks_req_id,
blobs_req_id,
data_column_requests.map(|data_column_requests| {
(
data_column_requests,
self.network_globals()
.sampling_columns()
.clone()
.iter()
.copied()
.collect(),
self.chain.sampling_columns_for_epoch(epoch).to_vec(),
)
}),
);
@@ -1015,11 +1017,16 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
.cached_data_column_indexes(&block_root)
.unwrap_or_default();
let current_epoch = self.chain.epoch().map_err(|e| {
RpcRequestSendError::InternalError(format!("Unable to read slot clock {:?}", e))
})?;
// Include only the blob indexes not yet imported (received through gossip)
let custody_indexes_to_fetch = self
.network_globals()
.sampling_columns()
.into_iter()
.chain
.sampling_columns_for_epoch(current_epoch)
.iter()
.copied()
.filter(|index| !custody_indexes_imported.contains(index))
.collect::<Vec<_>>();

View File

@@ -367,11 +367,11 @@ impl<E: EthSpec, B: BatchConfig> BatchInfo<E, B> {
pub fn processing_completed(
&mut self,
procesing_result: BatchProcessingResult,
processing_result: BatchProcessingResult,
) -> Result<BatchOperationOutcome, WrongState> {
match self.state.poison() {
BatchState::Processing(attempt) => {
self.state = match procesing_result {
self.state = match processing_result {
BatchProcessingResult::Success => BatchState::AwaitingValidation(attempt),
BatchProcessingResult::FaultyFailure => {
// register the failed attempt

View File

@@ -780,7 +780,7 @@ impl ChainSpec {
}
/// Returns the number of column sidecars to sample per slot.
pub fn sampling_size_columns(&self, custody_group_count: u64) -> Result<u64, String> {
pub fn sampling_size_columns(&self, custody_group_count: u64) -> Result<usize, String> {
let sampling_size_groups = self.sampling_size_custody_groups(custody_group_count)?;
let columns_per_custody_group = self
@@ -792,7 +792,7 @@ impl ChainSpec {
.safe_mul(sampling_size_groups)
.map_err(|_| "Computing sampling size should not overflow")?;
Ok(sampling_size_columns)
Ok(sampling_size_columns as usize)
}
/// Returns the number of custody groups to sample per slot.

View File

@@ -1,7 +1,6 @@
use crate::{ChainSpec, ColumnIndex, DataColumnSubnetId};
use alloy_primitives::U256;
use itertools::Itertools;
use maplit::hashset;
use safe_arith::{ArithError, SafeArith};
use std::collections::HashSet;
@@ -25,13 +24,32 @@ pub fn get_custody_groups(
custody_group_count: u64,
spec: &ChainSpec,
) -> Result<HashSet<CustodyIndex>, DataColumnCustodyGroupError> {
get_custody_groups_ordered(raw_node_id, custody_group_count, spec)
.map(|custody_groups| custody_groups.into_iter().collect())
}
/// Returns a deterministically ordered list of custody groups assigned to a node,
/// preserving the order in which they were computed during iteration.
///
/// # Arguments
/// * `raw_node_id` - 32-byte node identifier
/// * `custody_group_count` - Number of custody groups to generate
/// * `spec` - Chain specification containing custody group parameters
///
/// # Returns
/// Vector of custody group indices in computation order or error if parameters are invalid
pub fn get_custody_groups_ordered(
raw_node_id: [u8; 32],
custody_group_count: u64,
spec: &ChainSpec,
) -> Result<Vec<CustodyIndex>, DataColumnCustodyGroupError> {
if custody_group_count > spec.number_of_custody_groups {
return Err(DataColumnCustodyGroupError::InvalidCustodyGroupCount(
custody_group_count,
));
}
let mut custody_groups: HashSet<u64> = hashset![];
let mut custody_groups = vec![];
let mut current_id = U256::from_be_slice(&raw_node_id);
while custody_groups.len() < custody_group_count as usize {
let mut node_id_bytes = [0u8; 32];
@@ -44,7 +62,9 @@ pub fn get_custody_groups(
let custody_group = hash_prefix_u64
.safe_rem(spec.number_of_custody_groups)
.expect("spec.number_of_custody_groups must not be zero");
custody_groups.insert(custody_group);
if !custody_groups.contains(&custody_group) {
custody_groups.push(custody_group);
}
current_id = current_id.wrapping_add(U256::from(1u64));
}

View File

@@ -28,8 +28,8 @@ use std::time::Duration;
use types::{
Attestation, AttestationRef, AttesterSlashing, AttesterSlashingRef, BeaconBlock, BeaconState,
BlobSidecar, BlobsList, BlockImportSource, Checkpoint, DataColumnSidecarList,
ExecutionBlockHash, Hash256, IndexedAttestation, KzgProof, ProposerPreparationData,
SignedBeaconBlock, Slot, Uint256,
DataColumnSubnetId, ExecutionBlockHash, Hash256, IndexedAttestation, KzgProof,
ProposerPreparationData, SignedBeaconBlock, Slot, Uint256,
};
// When set to true, cache any states fetched from the db.
@@ -520,7 +520,8 @@ impl<E: EthSpec> Tester<E> {
let gossip_verified_data_columns = columns
.into_iter()
.map(|column| {
GossipVerifiedDataColumn::new(column.clone(), column.index, &self.harness.chain)
let subnet_id = DataColumnSubnetId::from_column_index(column.index, &self.spec);
GossipVerifiedDataColumn::new(column.clone(), subnet_id, &self.harness.chain)
.unwrap_or_else(|_| {
data_column_success = false;
GossipVerifiedDataColumn::__new_for_testing(column)