mirror of
https://github.com/sigp/lighthouse.git
synced 2026-05-31 13:17:09 +00:00
Refine Gloas data column availability
This commit is contained in:
@@ -55,13 +55,12 @@ use task_executor::TaskExecutor;
|
||||
use tracing::{Span, debug, error, instrument, trace};
|
||||
use types::{
|
||||
ChainSpec, ColumnIndex, DataColumnSidecar, DataColumnSidecarList, Epoch, EthSpec, Hash256,
|
||||
PartialDataColumnSidecarRef, SignedBeaconBlock, Slot,
|
||||
PartialDataColumnSidecarRef,
|
||||
};
|
||||
|
||||
mod pending_column;
|
||||
mod pending_components;
|
||||
|
||||
use crate::block_verification_types::AsBlock;
|
||||
use crate::data_column_verification::{
|
||||
GossipVerifiedDataColumn, KzgVerifiedCustodyDataColumn, KzgVerifiedDataColumn,
|
||||
};
|
||||
@@ -69,6 +68,7 @@ use crate::metrics::{
|
||||
KZG_DATA_COLUMN_RECONSTRUCTION_ATTEMPTS, KZG_DATA_COLUMN_RECONSTRUCTION_FAILURES,
|
||||
};
|
||||
use crate::observed_data_sidecars::ObservationStrategy;
|
||||
pub(crate) use pending_components::PendingPayloadBid;
|
||||
use pending_components::{PendingComponents, ReconstructColumnsDecision};
|
||||
use types::new_non_zero_usize;
|
||||
|
||||
@@ -153,19 +153,7 @@ impl<T: BeaconChainTypes> PendingPayloadCache<T> {
|
||||
block_root: Hash256,
|
||||
) -> Option<DataColumnSidecarList<T::EthSpec>> {
|
||||
self.peek_pending_components(&block_root, |components| {
|
||||
components.map(|c| {
|
||||
c.verified_data_columns
|
||||
.iter()
|
||||
.filter_map(|(col_idx, col)| {
|
||||
col.try_to_sidecar(
|
||||
*col_idx,
|
||||
c.block.slot(),
|
||||
block_root,
|
||||
c.num_blobs_expected(),
|
||||
)
|
||||
})
|
||||
.collect()
|
||||
})
|
||||
components.map(|c| c.get_cached_data_columns(block_root))
|
||||
})
|
||||
}
|
||||
|
||||
@@ -177,6 +165,13 @@ impl<T: BeaconChainTypes> PendingPayloadCache<T> {
|
||||
})
|
||||
}
|
||||
|
||||
/// Return the cached Gloas payload bid metadata for `block_root`, if present.
|
||||
pub fn get_bid(&self, block_root: &Hash256) -> Option<PendingPayloadBid<T::EthSpec>> {
|
||||
self.peek_pending_components(block_root, |components| {
|
||||
components.map(|components| components.bid.clone())
|
||||
})
|
||||
}
|
||||
|
||||
/// Filter out cells that are already cached for the given column sidecar.
|
||||
/// Returns the cells that still need KZG verification, or `None` if all cells are cached.
|
||||
#[instrument(skip_all, level = "trace")]
|
||||
@@ -206,12 +201,13 @@ impl<T: BeaconChainTypes> PendingPayloadCache<T> {
|
||||
/// Insert an executed payload envelope into the cache and performs an availability check
|
||||
pub fn put_executed_payload_envelope(
|
||||
&self,
|
||||
bid: PendingPayloadBid<T::EthSpec>,
|
||||
executed_envelope: AvailabilityPendingExecutedEnvelope<T::EthSpec>,
|
||||
) -> Result<Availability<T::EthSpec>, AvailabilityCheckError> {
|
||||
let epoch = executed_envelope.envelope.epoch();
|
||||
let beacon_block_root = executed_envelope.envelope.beacon_block_root();
|
||||
let pending_components =
|
||||
self.get_pending_components(beacon_block_root, |pending_components| {
|
||||
self.get_pending_components(beacon_block_root, bid, |pending_components| {
|
||||
pending_components.insert_executed_payload_envelope(executed_envelope);
|
||||
Ok(())
|
||||
})?;
|
||||
@@ -229,16 +225,10 @@ impl<T: BeaconChainTypes> PendingPayloadCache<T> {
|
||||
self.check_availability(beacon_block_root, pending_components, num_expected_columns)
|
||||
}
|
||||
|
||||
/// Initialize pending components for a block. Called when the beacon block (containing the
|
||||
/// bid) arrives. Sets up the slot and expected blob count so that subsequent column insertions
|
||||
/// know how many cells to expect per column.
|
||||
pub fn init_pending_block(
|
||||
&self,
|
||||
block_root: Hash256,
|
||||
block: Arc<SignedBeaconBlock<T::EthSpec>>,
|
||||
) {
|
||||
/// Initialize pending components for a block's Gloas bid.
|
||||
pub fn init_pending_bid(&self, block_root: Hash256, bid: PendingPayloadBid<T::EthSpec>) {
|
||||
let mut write_lock = self.availability_cache.write();
|
||||
write_lock.get_or_insert_mut(block_root, || PendingComponents::empty(block_root, block));
|
||||
write_lock.get_or_insert_mut(block_root, || PendingComponents::empty(block_root, bid));
|
||||
}
|
||||
|
||||
/// Perform KZG verification on RPC custody columns and insert them into the cache.
|
||||
@@ -247,14 +237,17 @@ impl<T: BeaconChainTypes> PendingPayloadCache<T> {
|
||||
pub fn put_rpc_custody_columns(
|
||||
&self,
|
||||
block_root: Hash256,
|
||||
slot: Slot,
|
||||
bid: PendingPayloadBid<T::EthSpec>,
|
||||
custody_columns: DataColumnSidecarList<T::EthSpec>,
|
||||
) -> Result<Availability<T::EthSpec>, AvailabilityCheckError> {
|
||||
let kzg_verified_columns =
|
||||
KzgVerifiedDataColumn::from_batch_with_scoring(custody_columns, &self.kzg)
|
||||
.map_err(AvailabilityCheckError::InvalidColumn)?;
|
||||
let kzg_verified_columns = KzgVerifiedDataColumn::from_batch_with_scoring_and_commitments(
|
||||
custody_columns,
|
||||
bid.blob_kzg_commitments.as_ref(),
|
||||
&self.kzg,
|
||||
)
|
||||
.map_err(AvailabilityCheckError::InvalidColumn)?;
|
||||
|
||||
let epoch = slot.epoch(T::EthSpec::slots_per_epoch());
|
||||
let epoch = bid.slot.epoch(T::EthSpec::slots_per_epoch());
|
||||
let sampling_columns = self
|
||||
.custody_context
|
||||
.sampling_columns_for_epoch(epoch, &self.spec);
|
||||
@@ -264,7 +257,7 @@ impl<T: BeaconChainTypes> PendingPayloadCache<T> {
|
||||
.map(KzgVerifiedCustodyDataColumn::from_asserted_custody)
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
self.put_kzg_verified_custody_data_columns(block_root, verified_custody_columns)
|
||||
self.put_kzg_verified_custody_data_columns(block_root, bid, verified_custody_columns)
|
||||
}
|
||||
|
||||
/// Perform KZG verification on gossip verified custody columns and insert them into the cache.
|
||||
@@ -273,10 +266,10 @@ impl<T: BeaconChainTypes> PendingPayloadCache<T> {
|
||||
pub fn put_gossip_verified_data_columns<O: ObservationStrategy>(
|
||||
&self,
|
||||
block_root: Hash256,
|
||||
slot: Slot,
|
||||
bid: PendingPayloadBid<T::EthSpec>,
|
||||
data_columns: Vec<GossipVerifiedDataColumn<T, O>>,
|
||||
) -> Result<Availability<T::EthSpec>, AvailabilityCheckError> {
|
||||
let epoch = slot.epoch(T::EthSpec::slots_per_epoch());
|
||||
let epoch = bid.slot.epoch(T::EthSpec::slots_per_epoch());
|
||||
let sampling_columns = self
|
||||
.custody_context
|
||||
.sampling_columns_for_epoch(epoch, &self.spec);
|
||||
@@ -286,7 +279,7 @@ impl<T: BeaconChainTypes> PendingPayloadCache<T> {
|
||||
.map(|c| KzgVerifiedCustodyDataColumn::from_asserted_custody(c.into_inner()))
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
self.put_kzg_verified_custody_data_columns(block_root, custody_columns)
|
||||
self.put_kzg_verified_custody_data_columns(block_root, bid, custody_columns)
|
||||
}
|
||||
|
||||
/// Insert KZG verified columns into the cache.
|
||||
@@ -294,11 +287,13 @@ impl<T: BeaconChainTypes> PendingPayloadCache<T> {
|
||||
pub fn put_kzg_verified_custody_data_columns(
|
||||
&self,
|
||||
block_root: Hash256,
|
||||
bid: PendingPayloadBid<T::EthSpec>,
|
||||
kzg_verified_data_columns: Vec<KzgVerifiedCustodyDataColumn<T::EthSpec>>,
|
||||
) -> Result<Availability<T::EthSpec>, AvailabilityCheckError> {
|
||||
let pending_components = self.get_pending_components(block_root, |pending_components| {
|
||||
pending_components.merge_data_columns(kzg_verified_data_columns)
|
||||
})?;
|
||||
let pending_components =
|
||||
self.get_pending_components(block_root, bid, |pending_components| {
|
||||
pending_components.merge_data_columns(kzg_verified_data_columns)
|
||||
})?;
|
||||
|
||||
let num_expected_columns = self.get_num_expected_columns(pending_components.epoch());
|
||||
|
||||
@@ -317,6 +312,7 @@ impl<T: BeaconChainTypes> PendingPayloadCache<T> {
|
||||
pub fn reconstruct_data_columns(
|
||||
&self,
|
||||
block_root: &Hash256,
|
||||
bid: PendingPayloadBid<T::EthSpec>,
|
||||
) -> Result<DataColumnReconstructionResult<T::EthSpec>, AvailabilityCheckError> {
|
||||
let verified_data_columns = match self.check_and_set_reconstruction_started(block_root) {
|
||||
ReconstructColumnsDecision::Yes(verified_data_columns) => verified_data_columns,
|
||||
@@ -324,6 +320,10 @@ impl<T: BeaconChainTypes> PendingPayloadCache<T> {
|
||||
return Ok(DataColumnReconstructionResult::NotStarted(reason));
|
||||
}
|
||||
};
|
||||
let existing_column_indices = verified_data_columns
|
||||
.iter()
|
||||
.map(|data_column| *data_column.index())
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
metrics::inc_counter(&KZG_DATA_COLUMN_RECONSTRUCTION_ATTEMPTS);
|
||||
let timer = metrics::start_timer(&metrics::DATA_AVAILABILITY_RECONSTRUCTION_TIME);
|
||||
@@ -344,12 +344,6 @@ impl<T: BeaconChainTypes> PendingPayloadCache<T> {
|
||||
AvailabilityCheckError::ReconstructColumnsError(e)
|
||||
})?;
|
||||
|
||||
let Some(existing_column_indices) = self.cached_data_column_indexes(block_root) else {
|
||||
return Err(AvailabilityCheckError::Unexpected(
|
||||
"block no longer exists in the data availability checker".to_string(),
|
||||
));
|
||||
};
|
||||
|
||||
let Some(slot) = all_data_columns.first().map(|d| d.as_data_column().slot()) else {
|
||||
return Ok(DataColumnReconstructionResult::RecoveredColumnsNotImported(
|
||||
"No new columns to import and publish",
|
||||
@@ -383,6 +377,7 @@ impl<T: BeaconChainTypes> PendingPayloadCache<T> {
|
||||
|
||||
self.put_kzg_verified_custody_data_columns(
|
||||
*block_root,
|
||||
bid,
|
||||
data_columns_to_import_and_publish.clone(),
|
||||
)
|
||||
.map(|availability| {
|
||||
@@ -436,16 +431,15 @@ impl<T: BeaconChainTypes> PendingPayloadCache<T> {
|
||||
}
|
||||
}
|
||||
|
||||
/// Gets existing `PendingComponents` and applies the `update_fn` while holding the write lock.
|
||||
/// Gets or creates `PendingComponents` and applies the `update_fn` while holding the write lock.
|
||||
///
|
||||
/// Once the update is complete, the write lock is downgraded and a read guard with a
|
||||
/// reference of the updated `PendingComponents` is returned.
|
||||
///
|
||||
/// Returns an error if no pending components exist for the given block root (the block must
|
||||
/// be initialized via `init_pending_block` first).
|
||||
fn get_pending_components<F>(
|
||||
&self,
|
||||
block_root: Hash256,
|
||||
bid: PendingPayloadBid<T::EthSpec>,
|
||||
update_fn: F,
|
||||
) -> Result<MappedRwLockReadGuard<'_, PendingComponents<T::EthSpec>>, AvailabilityCheckError>
|
||||
where
|
||||
@@ -454,11 +448,8 @@ impl<T: BeaconChainTypes> PendingPayloadCache<T> {
|
||||
let mut write_lock = self.availability_cache.write();
|
||||
|
||||
{
|
||||
let pending_components = write_lock.get_mut(&block_root).ok_or_else(|| {
|
||||
AvailabilityCheckError::Unexpected(
|
||||
"pending components not initialized for block".to_string(),
|
||||
)
|
||||
})?;
|
||||
let pending_components = write_lock
|
||||
.get_or_insert_mut(block_root, || PendingComponents::empty(block_root, bid));
|
||||
update_fn(pending_components)?
|
||||
}
|
||||
|
||||
@@ -634,7 +625,6 @@ mod data_availability_checker_tests {
|
||||
use crate::block_verification::PayloadVerificationOutcome;
|
||||
use crate::test_utils::{
|
||||
NumBlobs, generate_data_column_indices_rand_order, generate_rand_block_and_data_columns,
|
||||
test_spec,
|
||||
};
|
||||
use crate::{
|
||||
custody_context::NodeCustodyType,
|
||||
@@ -652,8 +642,10 @@ mod data_availability_checker_tests {
|
||||
};
|
||||
|
||||
type E = MinimalEthSpec;
|
||||
type T = DiskHarnessType<E>;
|
||||
|
||||
const LOW_VALIDATOR_COUNT: usize = 32;
|
||||
const RNG_SEED: u64 = 0xDEADBEEF;
|
||||
|
||||
fn gloas_spec<E: EthSpec>() -> Arc<ChainSpec> {
|
||||
let mut spec = E::default_spec();
|
||||
@@ -703,18 +695,7 @@ mod data_availability_checker_tests {
|
||||
.build()
|
||||
}
|
||||
|
||||
async fn setup_harness_and_cache<T>() -> (
|
||||
BeaconChainHarness<DiskHarnessType<E>>,
|
||||
Arc<PendingPayloadCache<T>>,
|
||||
TempDir,
|
||||
)
|
||||
where
|
||||
T: BeaconChainTypes<
|
||||
HotStore = BeaconNodeBackend<E>,
|
||||
ColdStore = BeaconNodeBackend<E>,
|
||||
EthSpec = E,
|
||||
>,
|
||||
{
|
||||
async fn setup() -> (BeaconChainHarness<T>, Arc<PendingPayloadCache<T>>, TempDir) {
|
||||
create_test_tracing_subscriber();
|
||||
let chain_db_path = tempdir().expect("should get temp dir");
|
||||
let harness = get_gloas_chain::<E>(&chain_db_path).await;
|
||||
@@ -732,22 +713,6 @@ mod data_availability_checker_tests {
|
||||
(harness, cache, chain_db_path)
|
||||
}
|
||||
|
||||
fn is_gloas_enabled() -> bool {
|
||||
let spec = test_spec::<E>();
|
||||
spec.fork_name_at_slot::<E>(Slot::new(0)).gloas_enabled()
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_cache_creation() {
|
||||
if !is_gloas_enabled() {
|
||||
return;
|
||||
}
|
||||
|
||||
type T = DiskHarnessType<E>;
|
||||
let (_harness, cache, _path) = setup_harness_and_cache::<T>().await;
|
||||
assert_eq!(cache.block_cache_size(), 0);
|
||||
}
|
||||
|
||||
fn make_test_signed_envelope(block_root: Hash256) -> Arc<SignedExecutionPayloadEnvelope<E>> {
|
||||
Arc::new(SignedExecutionPayloadEnvelope {
|
||||
message: ExecutionPayloadEnvelope {
|
||||
@@ -771,311 +736,132 @@ mod data_availability_checker_tests {
|
||||
}
|
||||
}
|
||||
|
||||
fn init_block(
|
||||
cache: &PendingPayloadCache<T>,
|
||||
spec: &ChainSpec,
|
||||
num_blobs: NumBlobs,
|
||||
seed: u64,
|
||||
) -> (PendingPayloadBid<E>, Hash256, DataColumnSidecarList<E>) {
|
||||
let mut rng = StdRng::seed_from_u64(seed);
|
||||
let (block, data_columns) =
|
||||
generate_rand_block_and_data_columns::<E>(ForkName::Gloas, num_blobs, &mut rng, spec);
|
||||
let block_root = block.canonical_root();
|
||||
let bid = PendingPayloadBid::from_block(&block).expect("should get payload bid");
|
||||
cache.init_pending_bid(block_root, bid.clone());
|
||||
(bid, block_root, data_columns)
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_put_columns_creates_pending_components() {
|
||||
if !is_gloas_enabled() {
|
||||
return;
|
||||
async fn caches_and_deduplicates_columns() {
|
||||
let (harness, cache, _path) = setup().await;
|
||||
let (bid, block_root, data_columns) =
|
||||
init_block(&cache, &harness.spec, NumBlobs::Number(1), RNG_SEED);
|
||||
let column = data_columns.first().cloned().expect("should have column");
|
||||
let column_index = *column.index();
|
||||
|
||||
for _ in 0..2 {
|
||||
cache
|
||||
.put_rpc_custody_columns(block_root, bid.clone(), vec![column.clone()])
|
||||
.expect("should put column");
|
||||
}
|
||||
|
||||
type T = DiskHarnessType<E>;
|
||||
let (harness, cache, _path) = setup_harness_and_cache::<T>().await;
|
||||
|
||||
let mut rng = StdRng::seed_from_u64(0xDEADBEEF);
|
||||
let spec = harness.spec.clone();
|
||||
|
||||
let (block, data_columns) = generate_rand_block_and_data_columns::<E>(
|
||||
ForkName::Gloas,
|
||||
NumBlobs::Number(1),
|
||||
&mut rng,
|
||||
&spec,
|
||||
assert_eq!(
|
||||
cache.cached_data_column_indexes(&block_root),
|
||||
Some(vec![column_index])
|
||||
);
|
||||
assert_eq!(
|
||||
cache.get_data_columns(block_root).map(|cols| cols.len()),
|
||||
Some(1)
|
||||
);
|
||||
let slot = block.slot();
|
||||
let block_root = Hash256::random();
|
||||
cache.init_pending_block(block_root, Arc::new(block));
|
||||
|
||||
let columns: DataColumnSidecarList<E> = data_columns.into_iter().take(1).collect();
|
||||
|
||||
let result = cache.put_rpc_custody_columns(block_root, slot, columns);
|
||||
assert!(result.is_ok(), "put_rpc_custody_columns failed: {result:?}");
|
||||
|
||||
assert_eq!(cache.block_cache_size(), 1);
|
||||
|
||||
let cached_indices = cache.peek_pending_components(&block_root, |components| {
|
||||
components.map(|c| c.get_cached_data_columns_indices())
|
||||
});
|
||||
assert!(cached_indices.is_some());
|
||||
assert_eq!(cached_indices.unwrap().len(), 1);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_column_deduplication() {
|
||||
if !is_gloas_enabled() {
|
||||
return;
|
||||
}
|
||||
|
||||
type T = DiskHarnessType<E>;
|
||||
let (harness, cache, _path) = setup_harness_and_cache::<T>().await;
|
||||
|
||||
let mut rng = StdRng::seed_from_u64(0xDEADBEEF);
|
||||
let spec = harness.spec.clone();
|
||||
|
||||
let (block, data_columns) = generate_rand_block_and_data_columns::<E>(
|
||||
ForkName::Gloas,
|
||||
NumBlobs::Number(1),
|
||||
&mut rng,
|
||||
&spec,
|
||||
);
|
||||
let slot = block.slot();
|
||||
let block_root = Hash256::random();
|
||||
cache.init_pending_block(block_root, Arc::new(block));
|
||||
|
||||
let first_column = data_columns.first().cloned().expect("should have column");
|
||||
let column_index = *first_column.index();
|
||||
|
||||
cache
|
||||
.put_rpc_custody_columns(block_root, slot, vec![first_column.clone()])
|
||||
.expect("should put column");
|
||||
|
||||
cache
|
||||
.put_rpc_custody_columns(block_root, slot, vec![first_column])
|
||||
.expect("should put column again");
|
||||
|
||||
let cached_indices = cache.peek_pending_components(&block_root, |components| {
|
||||
components.map(|c| c.get_cached_data_columns_indices())
|
||||
});
|
||||
assert!(cached_indices.is_some());
|
||||
let indices = cached_indices.unwrap();
|
||||
assert_eq!(indices.len(), 1);
|
||||
assert_eq!(indices[0], column_index);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_columns_without_envelope_not_available() {
|
||||
if !is_gloas_enabled() {
|
||||
return;
|
||||
}
|
||||
|
||||
type T = DiskHarnessType<E>;
|
||||
let (harness, cache, _path) = setup_harness_and_cache::<T>().await;
|
||||
|
||||
let mut rng = StdRng::seed_from_u64(0xDEADBEEF);
|
||||
let spec = harness.spec.clone();
|
||||
|
||||
let (block, data_columns) = generate_rand_block_and_data_columns::<E>(
|
||||
ForkName::Gloas,
|
||||
NumBlobs::Number(1),
|
||||
&mut rng,
|
||||
&spec,
|
||||
);
|
||||
let slot = block.slot();
|
||||
let block_root = Hash256::random();
|
||||
cache.init_pending_block(block_root, Arc::new(block));
|
||||
async fn requires_columns_and_executed_envelope() {
|
||||
let (harness, cache, _path) = setup().await;
|
||||
let (bid, block_root, data_columns) =
|
||||
init_block(&cache, &harness.spec, NumBlobs::Number(1), RNG_SEED);
|
||||
|
||||
let result = cache
|
||||
.put_rpc_custody_columns(block_root, slot, data_columns)
|
||||
.put_rpc_custody_columns(block_root, bid, data_columns)
|
||||
.expect("should put columns");
|
||||
assert!(matches!(result, Availability::MissingComponents(_)));
|
||||
|
||||
// Without an executed envelope, should still be missing components
|
||||
let result = cache
|
||||
.put_executed_payload_envelope(bid, make_test_executed_envelope(block_root))
|
||||
.expect("should put executed envelope");
|
||||
let Availability::Available(envelope) = result else {
|
||||
panic!("expected available envelope");
|
||||
};
|
||||
assert_eq!(envelope.block_root, block_root);
|
||||
assert_eq!(envelope.envelope.columns.len(), E::number_of_columns());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn zero_blob_envelope_is_available_without_columns() {
|
||||
let (harness, cache, _path) = setup().await;
|
||||
let (bid, block_root, _columns) =
|
||||
init_block(&cache, &harness.spec, NumBlobs::Number(0), RNG_SEED);
|
||||
|
||||
let result = cache
|
||||
.put_executed_payload_envelope(bid, make_test_executed_envelope(block_root))
|
||||
.expect("should put executed envelope");
|
||||
let Availability::Available(envelope) = result else {
|
||||
panic!("zero-blob block should be available");
|
||||
};
|
||||
assert!(envelope.envelope.columns.is_empty());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn partial_columns_wait_for_missing_columns() {
|
||||
let (harness, cache, _path) = setup().await;
|
||||
let (bid, block_root, data_columns) =
|
||||
init_block(&cache, &harness.spec, NumBlobs::Number(1), RNG_SEED);
|
||||
|
||||
cache
|
||||
.put_executed_payload_envelope(bid.clone(), make_test_executed_envelope(block_root))
|
||||
.expect("should put executed envelope");
|
||||
|
||||
let columns = data_columns.into_iter().take(1).collect();
|
||||
let result = cache
|
||||
.put_rpc_custody_columns(block_root, bid, columns)
|
||||
.expect("should put columns");
|
||||
assert!(matches!(result, Availability::MissingComponents(_)));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_full_availability_flow() {
|
||||
if !is_gloas_enabled() {
|
||||
return;
|
||||
}
|
||||
|
||||
type T = DiskHarnessType<E>;
|
||||
let (harness, cache, _path) = setup_harness_and_cache::<T>().await;
|
||||
|
||||
let mut rng = StdRng::seed_from_u64(0xDEADBEEF);
|
||||
let spec = harness.spec.clone();
|
||||
|
||||
let (block, data_columns) = generate_rand_block_and_data_columns::<E>(
|
||||
ForkName::Gloas,
|
||||
NumBlobs::Number(1),
|
||||
&mut rng,
|
||||
&spec,
|
||||
);
|
||||
let slot = block.slot();
|
||||
let block_root = Hash256::random();
|
||||
cache.init_pending_block(block_root, Arc::new(block));
|
||||
|
||||
let result = cache
|
||||
.put_rpc_custody_columns(block_root, slot, data_columns)
|
||||
.expect("should put columns");
|
||||
|
||||
assert!(matches!(result, Availability::MissingComponents(_)));
|
||||
|
||||
let executed_envelope = make_test_executed_envelope(block_root);
|
||||
let result = cache
|
||||
.put_executed_payload_envelope(executed_envelope)
|
||||
.expect("should put executed envelope");
|
||||
|
||||
assert!(
|
||||
matches!(result, Availability::Available(_)),
|
||||
"expected Available, got {:?}",
|
||||
result
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_zero_blob_immediately_available() {
|
||||
if !is_gloas_enabled() {
|
||||
return;
|
||||
}
|
||||
|
||||
type T = DiskHarnessType<E>;
|
||||
let (harness, cache, _path) = setup_harness_and_cache::<T>().await;
|
||||
|
||||
let mut rng = StdRng::seed_from_u64(0xDEADBEEF);
|
||||
let spec = harness.spec.clone();
|
||||
|
||||
let (block, _) = generate_rand_block_and_data_columns::<E>(
|
||||
ForkName::Gloas,
|
||||
NumBlobs::Number(0),
|
||||
&mut rng,
|
||||
&spec,
|
||||
);
|
||||
|
||||
let block_root = Hash256::random();
|
||||
cache.init_pending_block(block_root, Arc::new(block));
|
||||
|
||||
let executed_envelope = make_test_executed_envelope(block_root);
|
||||
let result = cache
|
||||
.put_executed_payload_envelope(executed_envelope)
|
||||
.expect("should put executed envelope");
|
||||
|
||||
assert!(
|
||||
matches!(result, Availability::Available(_)),
|
||||
"zero-blob block should be immediately available, got {:?}",
|
||||
result
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_handle_reconstruction_failure_clears_columns() {
|
||||
if !is_gloas_enabled() {
|
||||
return;
|
||||
}
|
||||
|
||||
type T = DiskHarnessType<E>;
|
||||
let (harness, cache, _path) = setup_harness_and_cache::<T>().await;
|
||||
|
||||
let mut rng = StdRng::seed_from_u64(0xDEADBEEF);
|
||||
let spec = harness.spec.clone();
|
||||
|
||||
let (block, data_columns) = generate_rand_block_and_data_columns::<E>(
|
||||
ForkName::Gloas,
|
||||
NumBlobs::Number(1),
|
||||
&mut rng,
|
||||
&spec,
|
||||
);
|
||||
let slot = block.slot();
|
||||
let block_root = Hash256::random();
|
||||
cache.init_pending_block(block_root, Arc::new(block));
|
||||
|
||||
let columns: DataColumnSidecarList<E> = data_columns.into_iter().take(5).collect();
|
||||
async fn reconstruction_failure_clears_columns() {
|
||||
let (harness, cache, _path) = setup().await;
|
||||
let (bid, block_root, data_columns) =
|
||||
init_block(&cache, &harness.spec, NumBlobs::Number(1), RNG_SEED);
|
||||
let columns = data_columns.into_iter().take(5).collect();
|
||||
|
||||
cache
|
||||
.put_rpc_custody_columns(block_root, slot, columns)
|
||||
.put_rpc_custody_columns(block_root, bid, columns)
|
||||
.expect("should put columns");
|
||||
|
||||
let cached_count = cache.peek_pending_components(&block_root, |components| {
|
||||
components.map(|c| c.verified_data_columns.len())
|
||||
});
|
||||
assert_eq!(cached_count, Some(5));
|
||||
assert_eq!(
|
||||
cache
|
||||
.cached_data_column_indexes(&block_root)
|
||||
.map(|indices| indices.len()),
|
||||
Some(5)
|
||||
);
|
||||
|
||||
cache.handle_reconstruction_failure(&block_root);
|
||||
|
||||
let cached_count_after = cache.peek_pending_components(&block_root, |components| {
|
||||
components.map(|c| c.verified_data_columns.len())
|
||||
});
|
||||
assert_eq!(cached_count_after, Some(0));
|
||||
assert_eq!(cache.cached_data_column_indexes(&block_root), Some(vec![]));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_maintenance_removes_old_entries() {
|
||||
if !is_gloas_enabled() {
|
||||
return;
|
||||
}
|
||||
|
||||
type T = DiskHarnessType<E>;
|
||||
let (_harness, cache, _path) = setup_harness_and_cache::<T>().await;
|
||||
|
||||
let cutoff_epoch = Epoch::new(100);
|
||||
cache
|
||||
.do_maintenance(cutoff_epoch)
|
||||
.expect("maintenance should succeed");
|
||||
|
||||
assert_eq!(cache.block_cache_size(), 0);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_peek_data_columns() {
|
||||
if !is_gloas_enabled() {
|
||||
return;
|
||||
}
|
||||
|
||||
type T = DiskHarnessType<E>;
|
||||
let (harness, cache, _path) = setup_harness_and_cache::<T>().await;
|
||||
|
||||
let mut rng = StdRng::seed_from_u64(0xDEADBEEF);
|
||||
let spec = harness.spec.clone();
|
||||
|
||||
let (block, data_columns) = generate_rand_block_and_data_columns::<E>(
|
||||
ForkName::Gloas,
|
||||
NumBlobs::Number(1),
|
||||
&mut rng,
|
||||
&spec,
|
||||
);
|
||||
let slot = block.slot();
|
||||
let block_root = Hash256::random();
|
||||
|
||||
assert!(cache.get_data_columns(block_root).is_none());
|
||||
|
||||
cache.init_pending_block(block_root, Arc::new(block));
|
||||
|
||||
let columns: DataColumnSidecarList<E> = data_columns.into_iter().take(3).collect();
|
||||
|
||||
cache
|
||||
.put_rpc_custody_columns(block_root, slot, columns)
|
||||
.expect("should put columns");
|
||||
|
||||
let peeked = cache.get_data_columns(block_root);
|
||||
assert!(peeked.is_some());
|
||||
assert_eq!(peeked.unwrap().len(), 3);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_lru_eviction() {
|
||||
if !is_gloas_enabled() {
|
||||
return;
|
||||
}
|
||||
|
||||
type T = DiskHarnessType<E>;
|
||||
let (harness, cache, _path) = setup_harness_and_cache::<T>().await;
|
||||
|
||||
let mut rng = StdRng::seed_from_u64(0xDEADBEEF);
|
||||
let spec = harness.spec.clone();
|
||||
|
||||
let (block, data_columns) = generate_rand_block_and_data_columns::<E>(
|
||||
ForkName::Gloas,
|
||||
NumBlobs::Number(1),
|
||||
&mut rng,
|
||||
&spec,
|
||||
);
|
||||
let slot = block.slot();
|
||||
let block = Arc::new(block);
|
||||
let first_column = data_columns.first().cloned().expect("should have column");
|
||||
|
||||
async fn lru_eviction_keeps_cache_bounded() {
|
||||
let (harness, cache, _path) = setup().await;
|
||||
let mut roots = Vec::new();
|
||||
for _ in 0..33 {
|
||||
let block_root = Hash256::random();
|
||||
|
||||
for i in 0..33 {
|
||||
let (bid, block_root, data_columns) =
|
||||
init_block(&cache, &harness.spec, NumBlobs::Number(1), RNG_SEED + i);
|
||||
let column = data_columns.first().cloned().expect("should have column");
|
||||
roots.push(block_root);
|
||||
cache.init_pending_block(block_root, block.clone());
|
||||
cache
|
||||
.put_rpc_custody_columns(block_root, slot, vec![first_column.clone()])
|
||||
.put_rpc_custody_columns(block_root, bid, vec![column])
|
||||
.expect("should put columns");
|
||||
}
|
||||
|
||||
@@ -1085,79 +871,20 @@ mod data_availability_checker_tests {
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_maintenance_prunes_old_entries() {
|
||||
if !is_gloas_enabled() {
|
||||
return;
|
||||
}
|
||||
async fn maintenance_prunes_old_entries() {
|
||||
let (harness, cache, _path) = setup().await;
|
||||
let (bid, block_root, data_columns) =
|
||||
init_block(&cache, &harness.spec, NumBlobs::Number(1), RNG_SEED);
|
||||
let column = data_columns.first().cloned().expect("should have column");
|
||||
|
||||
type T = DiskHarnessType<E>;
|
||||
let (harness, cache, _path) = setup_harness_and_cache::<T>().await;
|
||||
|
||||
let mut rng = StdRng::seed_from_u64(0xDEADBEEF);
|
||||
let spec = harness.spec.clone();
|
||||
|
||||
let (block, data_columns) = generate_rand_block_and_data_columns::<E>(
|
||||
ForkName::Gloas,
|
||||
NumBlobs::Number(1),
|
||||
&mut rng,
|
||||
&spec,
|
||||
);
|
||||
let slot = block.slot();
|
||||
let block_root = Hash256::random();
|
||||
cache.init_pending_block(block_root, Arc::new(block));
|
||||
|
||||
let col = data_columns.first().cloned().expect("should have column");
|
||||
cache
|
||||
.put_rpc_custody_columns(block_root, slot, vec![col])
|
||||
.put_rpc_custody_columns(block_root, bid, vec![column])
|
||||
.expect("should put columns");
|
||||
|
||||
assert_eq!(cache.block_cache_size(), 1);
|
||||
|
||||
// slot=0 → epoch=0 < cutoff=100, should prune
|
||||
cache
|
||||
.do_maintenance(Epoch::new(100))
|
||||
.do_maintenance(Epoch::new(1))
|
||||
.expect("maintenance should succeed");
|
||||
|
||||
assert_eq!(cache.block_cache_size(), 0);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_partial_columns_missing_components() {
|
||||
if !is_gloas_enabled() {
|
||||
return;
|
||||
}
|
||||
|
||||
type T = DiskHarnessType<E>;
|
||||
let (harness, cache, _path) = setup_harness_and_cache::<T>().await;
|
||||
|
||||
let mut rng = StdRng::seed_from_u64(0xDEADBEEF);
|
||||
let spec = harness.spec.clone();
|
||||
|
||||
let (block, data_columns) = generate_rand_block_and_data_columns::<E>(
|
||||
ForkName::Gloas,
|
||||
NumBlobs::Number(1),
|
||||
&mut rng,
|
||||
&spec,
|
||||
);
|
||||
let slot = block.slot();
|
||||
let block_root = Hash256::random();
|
||||
cache.init_pending_block(block_root, Arc::new(block));
|
||||
|
||||
let executed_envelope = make_test_executed_envelope(block_root);
|
||||
cache
|
||||
.put_executed_payload_envelope(executed_envelope)
|
||||
.expect("should put executed envelope");
|
||||
|
||||
// Insert only 1 column (need 128 for fullnode)
|
||||
let columns: DataColumnSidecarList<E> = data_columns.into_iter().take(1).collect();
|
||||
|
||||
let result = cache
|
||||
.put_rpc_custody_columns(block_root, slot, columns)
|
||||
.expect("should put columns");
|
||||
|
||||
assert!(
|
||||
matches!(result, Availability::MissingComponents(_)),
|
||||
"partial columns should not trigger availability"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user