Refactor Custody Context Availability Checks (#9515)

Co-Authored-By: Mark Mackey <mark@sigmaprime.io>
This commit is contained in:
ethDreamer
2026-06-24 21:53:43 -05:00
committed by GitHub
parent 99fb99c941
commit a4c4cccf04
39 changed files with 939 additions and 830 deletions

View File

@@ -18,7 +18,7 @@ use tracing::{debug, error, instrument};
use types::data::{BlobIdentifier, FixedBlobSidecarList, PartialDataColumn};
use types::{
BlobSidecar, BlobSidecarList, BlockImportSource, ChainSpec, DataColumnSidecar,
DataColumnSidecarList, Epoch, EthSpec, Hash256, PartialDataColumnSidecarError,
DataColumnSidecarList, EthSpec, Hash256, PartialDataColumnSidecarError,
PartialDataColumnSidecarRef, SignedBeaconBlock, Slot,
};
@@ -75,12 +75,9 @@ const OVERFLOW_LRU_CAPACITY: usize = 32;
/// proposer. Having a capacity > 1 is an optimization to prevent sync lookup from having re-fetch
/// data during moments of unstable network conditions.
pub struct DataAvailabilityChecker<T: BeaconChainTypes> {
complete_blob_backfill: bool,
availability_cache: Arc<DataAvailabilityCheckerInner<T>>,
partial_assembler: Option<Arc<PartialDataColumnAssembler<T::EthSpec>>>,
slot_clock: T::SlotClock,
kzg: Arc<Kzg>,
custody_context: Arc<CustodyContext<T::EthSpec>>,
spec: Arc<ChainSpec>,
}
@@ -115,10 +112,8 @@ impl<E: EthSpec> Debug for Availability<E> {
impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
pub fn new(
complete_blob_backfill: bool,
slot_clock: T::SlotClock,
kzg: Arc<Kzg>,
custody_context: Arc<CustodyContext<T::EthSpec>>,
custody_context: Arc<CustodyContext<T>>,
spec: Arc<ChainSpec>,
enable_partial_columns: bool,
disable_get_blobs: bool,
@@ -137,18 +132,15 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
None
};
Ok(Self {
complete_blob_backfill,
partial_assembler,
availability_cache: Arc::new(inner),
slot_clock,
kzg,
custody_context,
spec,
})
}
pub fn custody_context(&self) -> &Arc<CustodyContext<T::EthSpec>> {
&self.custody_context
fn custody_context(&self) -> &Arc<CustodyContext<T>> {
self.availability_cache.custody_context()
}
pub fn partial_assembler(&self) -> Option<&Arc<PartialDataColumnAssembler<T::EthSpec>>> {
@@ -310,9 +302,9 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
&self,
block_root: Hash256,
blobs: FixedBlobSidecarList<T::EthSpec>,
slot_clock: &T::SlotClock,
) -> Result<Availability<T::EthSpec>, AvailabilityCheckError> {
let seen_timestamp = self
.slot_clock
let seen_timestamp = slot_clock
.now_duration()
.ok_or(AvailabilityCheckError::SlotClockError)?;
@@ -350,9 +342,7 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
// not be yet effective for data availability check, as CGC changes are only effecive from
// a new epoch.
let epoch = slot.epoch(T::EthSpec::slots_per_epoch());
let sampling_columns = self
.custody_context
.sampling_columns_for_epoch(epoch, &self.spec);
let sampling_columns = self.custody_context().sampling_columns_for_epoch(epoch);
let verified_custody_columns = kzg_verified_columns
.into_iter()
.filter(|col| sampling_columns.contains(&col.index()))
@@ -390,9 +380,7 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
data_columns: I,
) -> Result<Availability<T::EthSpec>, AvailabilityCheckError> {
let epoch = slot.epoch(T::EthSpec::slots_per_epoch());
let sampling_columns = self
.custody_context
.sampling_columns_for_epoch(epoch, &self.spec);
let sampling_columns = self.custody_context().sampling_columns_for_epoch(epoch);
let custody_columns = data_columns
.into_iter()
.filter(|col| sampling_columns.contains(&col.index()))
@@ -507,59 +495,6 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
Ok(())
}
/// Determines the blob requirements for a block. If the block is pre-deneb, no blobs are required.
/// If the epoch is from prior to the data availability boundary, no blobs are required.
pub fn blobs_required_for_epoch(&self, epoch: Epoch) -> bool {
self.da_check_required_for_epoch(epoch) && !self.spec.is_peer_das_enabled_for_epoch(epoch)
}
/// Determines the data column requirements for an epoch.
/// - If the epoch is pre-peerdas, no data columns are required.
/// - If the epoch is from prior to the data availability boundary, no data columns are required.
pub fn data_columns_required_for_epoch(&self, epoch: Epoch) -> bool {
self.da_check_required_for_epoch(epoch) && self.spec.is_peer_das_enabled_for_epoch(epoch)
}
/// See `Self::blobs_required_for_epoch`
fn blobs_required_for_block(&self, block: &SignedBeaconBlock<T::EthSpec>) -> bool {
block.num_expected_blobs() > 0 && self.blobs_required_for_epoch(block.epoch())
}
/// See `Self::data_columns_required_for_epoch`
fn data_columns_required_for_block(&self, block: &SignedBeaconBlock<T::EthSpec>) -> bool {
block.num_expected_blobs() > 0 && self.data_columns_required_for_epoch(block.epoch())
}
/// The epoch at which we require a data availability check in block processing.
/// `None` if the `Deneb` fork is disabled.
pub fn data_availability_boundary(&self) -> Option<Epoch> {
let fork_epoch = self.spec.deneb_fork_epoch?;
if self.complete_blob_backfill {
Some(fork_epoch)
} else {
let current_epoch = self.slot_clock.now()?.epoch(T::EthSpec::slots_per_epoch());
self.spec
.min_epoch_data_availability_boundary(current_epoch)
}
}
/// Returns true if the given epoch lies within the da boundary and false otherwise.
pub fn da_check_required_for_epoch(&self, block_epoch: Epoch) -> bool {
self.data_availability_boundary()
.is_some_and(|da_epoch| block_epoch >= da_epoch)
}
/// Returns `true` if the current epoch is greater than or equal to the `Deneb` epoch.
pub fn is_deneb(&self) -> bool {
self.slot_clock.now().is_some_and(|slot| {
self.spec.deneb_fork_epoch.is_some_and(|deneb_epoch| {
let now_epoch = slot.epoch(T::EthSpec::slots_per_epoch());
now_epoch >= deneb_epoch
})
})
}
/// Collects metrics from the data availability checker.
pub fn metrics(&self) -> DataAvailabilityCheckerMetrics {
DataAvailabilityCheckerMetrics {
@@ -629,7 +564,7 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
let columns_to_sample = self
.custody_context()
.sampling_columns_for_epoch(slot.epoch(T::EthSpec::slots_per_epoch()), &self.spec);
.sampling_columns_for_epoch(slot.epoch(T::EthSpec::slots_per_epoch()));
// We only need to import and publish columns that we need to sample
// and columns that we haven't already received
@@ -886,15 +821,14 @@ impl<E: EthSpec> AvailableBlock<E> {
pub fn new<T>(
block: Arc<SignedBeaconBlock<T::EthSpec>>,
block_data: AvailableBlockData<T::EthSpec>,
da_checker: &DataAvailabilityChecker<T>,
spec: Arc<ChainSpec>,
custody_context: &CustodyContext<T>,
) -> Result<Self, AvailabilityCheckError>
where
T: BeaconChainTypes<EthSpec = E>,
{
// Ensure block availability
let blobs_required = da_checker.blobs_required_for_block(&block);
let columns_required = da_checker.data_columns_required_for_block(&block);
let blobs_required = custody_context.blobs_required_for_block(&block);
let columns_required = custody_context.data_columns_required_for_block(&block);
match &block_data {
AvailableBlockData::NoData => {
@@ -935,9 +869,8 @@ impl<E: EthSpec> AvailableBlock<E> {
return Err(AvailabilityCheckError::InvalidAvailableBlockData);
}
let mut column_indices = da_checker
.custody_context
.sampling_columns_for_epoch(block.epoch(), &spec)
let mut column_indices = custody_context
.sampling_columns_for_epoch(block.epoch())
.iter()
.collect::<HashSet<_>>();
@@ -1081,7 +1014,8 @@ mod test {
use std::time::Duration;
use types::data::DataColumn;
use types::{
ChainSpec, ColumnIndex, DataColumnSidecarFulu, EthSpec, ForkName, MainnetEthSpec, Slot,
ChainSpec, ColumnIndex, DataColumnSidecarFulu, Epoch, EthSpec, ForkName, MainnetEthSpec,
Slot,
};
type E = MainnetEthSpec;
@@ -1096,7 +1030,7 @@ mod test {
let mut u = types::test_utils::test_unstructured();
let da_checker = new_da_checker(spec.clone());
let custody_context = &da_checker.custody_context;
let custody_context = da_checker.custody_context();
// GIVEN a single 32 ETH validator is attached slot 0
let epoch = Epoch::new(0);
@@ -1104,10 +1038,9 @@ mod test {
custody_context.register_validators(
vec![(validator_0, 32_000_000_000)],
epoch.start_slot(E::slots_per_epoch()),
&spec,
);
assert_eq!(
custody_context.num_of_data_columns_to_sample(epoch, &spec),
custody_context.num_of_data_columns_to_sample(epoch),
spec.validator_custody_requirement as usize,
"sampling size should be the minimal custody requirement == 8"
);
@@ -1115,11 +1048,8 @@ mod test {
// WHEN additional attached validators result in a CGC increase to 10 at the end slot of the same epoch
let validator_1 = 1;
let cgc_change_slot = epoch.end_slot(E::slots_per_epoch());
custody_context.register_validators(
vec![(validator_1, 32_000_000_000 * 9)],
cgc_change_slot,
&spec,
);
custody_context
.register_validators(vec![(validator_1, 32_000_000_000 * 9)], cgc_change_slot);
// AND custody columns (8) and any new extra columns (2) are received via RPC responses.
// NOTE: block lookup uses the **latest** CGC (10) instead of the effective CGC (8) as the slot is unknown.
let (_, data_columns) = generate_rand_block_and_data_columns::<E>(
@@ -1134,7 +1064,7 @@ mod test {
// The CGC change becomes effective after CUSTODY_CHANGE_DA_EFFECTIVE_DELAY_SECONDS,
// which is typically epoch 2+ for MinimalEthSpec.
let future_epoch = Epoch::new(10); // Far enough in the future to have the CGC change effective
let requested_columns = custody_context.sampling_columns_for_epoch(future_epoch, &spec);
let requested_columns = custody_context.sampling_columns_for_epoch(future_epoch);
assert_eq!(
requested_columns.len(),
10,
@@ -1152,7 +1082,7 @@ mod test {
.expect("should put rpc custody columns");
// THEN the sampling size for the end slot of the same epoch remains unchanged
let sampling_columns = custody_context.sampling_columns_for_epoch(epoch, &spec);
let sampling_columns = custody_context.sampling_columns_for_epoch(epoch);
assert_eq!(
sampling_columns.len(),
spec.validator_custody_requirement as usize // 8
@@ -1183,7 +1113,7 @@ mod test {
let mut u = types::test_utils::test_unstructured();
let da_checker = new_da_checker(spec.clone());
let custody_context = &da_checker.custody_context;
let custody_context = da_checker.custody_context();
// GIVEN a single 32 ETH validator is attached slot 0
let epoch = Epoch::new(0);
@@ -1191,10 +1121,9 @@ mod test {
custody_context.register_validators(
vec![(validator_0, 32_000_000_000)],
epoch.start_slot(E::slots_per_epoch()),
&spec,
);
assert_eq!(
custody_context.num_of_data_columns_to_sample(epoch, &spec),
custody_context.num_of_data_columns_to_sample(epoch),
spec.validator_custody_requirement as usize,
"sampling size should be the minimal custody requirement == 8"
);
@@ -1202,11 +1131,8 @@ mod test {
// WHEN additional attached validators result in a CGC increase to 10 at the end slot of the same epoch
let validator_1 = 1;
let cgc_change_slot = epoch.end_slot(E::slots_per_epoch());
custody_context.register_validators(
vec![(validator_1, 32_000_000_000 * 9)],
cgc_change_slot,
&spec,
);
custody_context
.register_validators(vec![(validator_1, 32_000_000_000 * 9)], cgc_change_slot);
// AND custody columns (8) and any new extra columns (2) are received via gossip.
// NOTE: CGC updates results in new topics subscriptions immediately, and extra columns may start to
// arrive via gossip.
@@ -1222,7 +1148,7 @@ mod test {
// The CGC change becomes effective after CUSTODY_CHANGE_DA_EFFECTIVE_DELAY_SECONDS,
// which is typically epoch 2+ for MinimalEthSpec.
let future_epoch = Epoch::new(10); // Far enough in the future to have the CGC change effective
let requested_columns = custody_context.sampling_columns_for_epoch(future_epoch, &spec);
let requested_columns = custody_context.sampling_columns_for_epoch(future_epoch);
assert_eq!(
requested_columns.len(),
10,
@@ -1238,7 +1164,7 @@ mod test {
.expect("should put gossip custody columns");
// THEN the sampling size for the end slot of the same epoch remains unchanged
let sampling_columns = custody_context.sampling_columns_for_epoch(epoch, &spec);
let sampling_columns = custody_context.sampling_columns_for_epoch(epoch);
assert_eq!(
sampling_columns.len(),
spec.validator_custody_requirement as usize // 8
@@ -1305,8 +1231,7 @@ mod test {
};
let block_data = AvailableBlockData::new_with_data_columns(custody_columns);
let da_checker = Arc::new(new_da_checker(spec.clone()));
RangeSyncBlock::new(Arc::new(block), block_data, &da_checker, spec.clone())
RangeSyncBlock::new(Arc::new(block), block_data, da_checker.custody_context())
.expect("should create RPC block with custody columns")
})
.collect::<Vec<_>>();
@@ -1331,16 +1256,15 @@ mod test {
let mut u = types::test_utils::test_unstructured();
let da_checker = new_da_checker(spec.clone());
let custody_context = &da_checker.custody_context;
let custody_context = da_checker.custody_context();
// Set custody requirement to 65 columns (enough to trigger reconstruction)
let epoch = Epoch::new(1);
custody_context.register_validators(
vec![(0, 2_048_000_000_000), (1, 32_000_000_000)], // 64 + 1
Slot::new(0),
&spec,
);
let sampling_requirement = custody_context.num_of_data_columns_to_sample(epoch, &spec);
let sampling_requirement = custody_context.num_of_data_columns_to_sample(epoch);
assert_eq!(
sampling_requirement, 65,
"sampling requirement should be 65"
@@ -1362,7 +1286,7 @@ mod test {
// Add 64 columns to the da checker (enough to be able to reconstruct)
// Order by all_column_indices_ordered, then take first 64
let custody_columns = custody_context.sampling_columns_for_epoch(epoch, &spec);
let custody_columns = custody_context.sampling_columns_for_epoch(epoch);
let custody_columns = custody_columns
.iter()
.filter_map(|&col_idx| data_columns.iter().find(|d| *d.index() == col_idx).cloned())
@@ -1400,7 +1324,7 @@ mod test {
);
// Only the columns required for custody (65) should be imported into the cache
let sampling_columns = custody_context.sampling_columns_for_epoch(epoch, &spec);
let sampling_columns = custody_context.sampling_columns_for_epoch(epoch);
let actual_cached: HashSet<ColumnIndex> = da_checker
.cached_data_column_indexes(&block_root)
.expect("should have cached data columns")
@@ -1421,21 +1345,15 @@ mod test {
);
let kzg = get_kzg(&spec);
let ordered_custody_column_indices = generate_data_column_indices_rand_order::<E>();
let complete_blob_backfill = false;
let custody_context = Arc::new(CustodyContext::new(
NodeCustodyType::Fullnode,
ordered_custody_column_indices,
&spec,
));
let complete_blob_backfill = false;
DataAvailabilityChecker::new(
complete_blob_backfill,
slot_clock,
kzg,
custody_context,
spec,
true,
false,
)
.expect("should initialise data availability checker")
complete_blob_backfill,
spec.clone(),
));
DataAvailabilityChecker::new(kzg, custody_context, spec, true, false)
.expect("should initialise data availability checker")
}
}