some claude progress

This commit is contained in:
Daniel Knopik
2026-04-28 17:00:42 +02:00
parent 4ef4c7ddd4
commit 3772d2fa5b
4 changed files with 198 additions and 585 deletions

View File

@@ -57,13 +57,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
); );
} }
self.data_availability_checker
.pending_payload_cache()
.put_pre_executed_payload_envelope(
unverified_envelope.envelope_cloned(),
envelope_source,
)?;
let _full_timer = metrics::start_timer(&metrics::ENVELOPE_PROCESSING_TIMES); let _full_timer = metrics::start_timer(&metrics::ENVELOPE_PROCESSING_TIMES);
metrics::inc_counter(&metrics::ENVELOPE_PROCESSING_REQUESTS); metrics::inc_counter(&metrics::ENVELOPE_PROCESSING_REQUESTS);
@@ -95,11 +88,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
// If the envelope fails execution for whatever reason (e.g. engine offline), // If the envelope fails execution for whatever reason (e.g. engine offline),
// and we keep it in the cache, then the node will NOT perform lookup and // and we keep it in the cache, then the node will NOT perform lookup and
// reprocess this envelope until the envelope is evicted from DA checker, causing the // reprocess this envelope until the envelope is evicted from DA checker, causing the
// chain to get stuck temporarily if the envelope is canonical. Therefore we remove
// it from the cache if execution fails.
self.data_availability_checker
.pending_payload_cache()
.remove_pre_executed_payload_envelope(&block_root);
})?; })?;
// Record the time it took to wait for execution layer verification. // Record the time it took to wait for execution layer verification.

View File

@@ -46,6 +46,7 @@ use kzg::Kzg;
use lru::LruCache; use lru::LruCache;
use parking_lot::{MappedRwLockReadGuard, RwLock, RwLockReadGuard, RwLockWriteGuard}; use parking_lot::{MappedRwLockReadGuard, RwLock, RwLockReadGuard, RwLockWriteGuard};
use slot_clock::SlotClock; use slot_clock::SlotClock;
use std::collections::HashMap;
use std::fmt; use std::fmt;
use std::fmt::Debug; use std::fmt::Debug;
use std::num::NonZeroUsize; use std::num::NonZeroUsize;
@@ -53,9 +54,8 @@ use std::sync::Arc;
use task_executor::TaskExecutor; use task_executor::TaskExecutor;
use tracing::{Span, debug, error, instrument, trace}; use tracing::{Span, debug, error, instrument, trace};
use types::{ use types::{
BlockImportSource, ChainSpec, ColumnIndex, DataColumnSidecar, DataColumnSidecarList, Epoch, ChainSpec, ColumnIndex, DataColumnSidecar, DataColumnSidecarList, Epoch, EthSpec, Hash256,
EthSpec, Hash256, PartialDataColumnSidecarRef, SignedExecutionPayloadBid, PartialDataColumnSidecarRef, Slot,
SignedExecutionPayloadEnvelope, Slot,
}; };
mod pending_column; mod pending_column;
@@ -90,16 +90,6 @@ pub enum Availability<E: EthSpec> {
Available(Box<AvailableExecutedEnvelope<E>>), Available(Box<AvailableExecutedEnvelope<E>>),
} }
pub enum PayloadEnvelopeProcessingStatus<E: EthSpec> {
/// Envelope is not in any pre-import cache. Envelope may be in the data-base or in the fork-choice.
Unknown,
/// Envelope is currently processing but not yet validated.
NotValidated(Arc<SignedExecutionPayloadEnvelope<E>>, BlockImportSource),
/// Envelope is fully valid, but not yet imported. It's cached in the da_checker while awaiting
/// missing envelope components.
ExecutionValidated(Arc<SignedExecutionPayloadEnvelope<E>>),
}
impl<E: EthSpec> Debug for Availability<E> { impl<E: EthSpec> Debug for Availability<E> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self { match self {
@@ -190,6 +180,8 @@ impl<T: BeaconChainTypes> PendingPayloadCache<T> {
&'_ self, &'_ self,
data_column: &'a DataColumnSidecar<T::EthSpec>, data_column: &'a DataColumnSidecar<T::EthSpec>,
) -> Result<Option<PartialDataColumnSidecarRef<'a, T::EthSpec>>, MissingCellsError> { ) -> Result<Option<PartialDataColumnSidecarRef<'a, T::EthSpec>>, MissingCellsError> {
// TODO(gloas): implement cell-level missing check
Ok(None)
} }
/// Insert an executed payload envelope into the cache and performs an availability check /// Insert an executed payload envelope into the cache and performs an availability check
@@ -200,7 +192,7 @@ impl<T: BeaconChainTypes> PendingPayloadCache<T> {
let epoch = executed_envelope.envelope.epoch(); let epoch = executed_envelope.envelope.epoch();
let beacon_block_root = executed_envelope.envelope.beacon_block_root(); let beacon_block_root = executed_envelope.envelope.beacon_block_root();
let pending_components = let pending_components =
self.update_or_insert_pending_components(beacon_block_root, |pending_components| { self.get_pending_components(beacon_block_root, |pending_components| {
pending_components.insert_executed_payload_envelope(executed_envelope); pending_components.insert_executed_payload_envelope(executed_envelope);
Ok(()) Ok(())
})?; })?;
@@ -218,70 +210,14 @@ impl<T: BeaconChainTypes> PendingPayloadCache<T> {
self.check_availability(beacon_block_root, pending_components, num_expected_columns) self.check_availability(beacon_block_root, pending_components, num_expected_columns)
} }
/// Insert a pre executed payload envelope in the cache /// Initialize pending components for a block. Called when the beacon block (containing the
pub fn put_pre_executed_payload_envelope( /// bid) arrives. Sets up the slot and expected blob count so that subsequent column insertions
&self, /// know how many cells to expect per column.
envelope: Arc<SignedExecutionPayloadEnvelope<T::EthSpec>>, pub fn init_pending_block(&self, block_root: Hash256, slot: Slot, num_blobs_expected: usize) {
source: BlockImportSource, let mut write_lock = self.availability_cache.write();
) -> Result<(), AvailabilityCheckError> { write_lock.get_or_insert_mut(block_root, || {
let epoch = envelope.epoch(); PendingComponents::empty(block_root, slot, num_blobs_expected, self.spec.clone())
let beacon_block_root = envelope.beacon_block_root();
let pending_components =
self.update_or_insert_pending_components(beacon_block_root, |pending_components| {
pending_components.insert_pre_executed_payload_envelope(envelope, source);
Ok(())
})?;
let num_expected_columns = self.get_num_expected_columns(epoch);
pending_components.span.in_scope(|| {
debug!(
component = "pre executed payload envelope",
status = pending_components.status_str(num_expected_columns),
"Component added to data availability checker"
);
}); });
Ok(())
}
/// Removes a pre-executed envelope from the cache.
/// This does NOT remove an existing executed envelope.
pub fn remove_pre_executed_payload_envelope(&self, block_root: &Hash256) {
if let Some(PayloadEnvelopeProcessingStatus::NotValidated(_, _)) =
self.get_envelope_processing_status(block_root)
{
// If the envelope is execution invalid, this status is permanent and idempotent to this
// block_root. We drop its components (e.g. columns) because they will never be useful.
self.availability_cache.write().pop(block_root);
}
}
/// Insert an execution payload bid into the cache.
pub fn put_bid(
&self,
block_root: Hash256,
bid: Arc<SignedExecutionPayloadBid<T::EthSpec>>,
) -> Result<Availability<T::EthSpec>, AvailabilityCheckError> {
let epoch = bid.message.slot.epoch(T::EthSpec::slots_per_epoch());
let pending_components =
self.update_or_insert_pending_components(block_root, |pending_components| {
pending_components.insert_bid(bid);
Ok(())
})?;
let num_expected_columns = self.get_num_expected_columns(epoch);
pending_components.span.in_scope(|| {
debug!(
component = "bid",
status = pending_components.status_str(num_expected_columns),
"Component added to data availability checker"
);
});
self.check_availability(block_root, pending_components, num_expected_columns)
} }
/// Perform KZG verification on RPC custody columns and insert them into the cache. /// Perform KZG verification on RPC custody columns and insert them into the cache.
@@ -347,8 +283,7 @@ impl<T: BeaconChainTypes> PendingPayloadCache<T> {
return Ok(Availability::MissingComponents(block_root)); return Ok(Availability::MissingComponents(block_root));
}; };
let pending_components = self let pending_components = self.get_pending_components(block_root, |pending_components| {
.update_or_insert_pending_components(block_root, |pending_components| {
pending_components.merge_data_columns(kzg_verified_data_columns) pending_components.merge_data_columns(kzg_verified_data_columns)
})?; })?;
@@ -488,12 +423,14 @@ impl<T: BeaconChainTypes> PendingPayloadCache<T> {
} }
} }
/// Updates or inserts a new `PendingComponents` if it doesn't exist, and then apply the /// Gets existing `PendingComponents` and applies the `update_fn` while holding the write lock.
/// `update_fn` while holding the write lock.
/// ///
/// Once the update is complete, the write lock is downgraded and a read guard with a /// Once the update is complete, the write lock is downgraded and a read guard with a
/// reference of the updated `PendingComponents` is returned. /// reference of the updated `PendingComponents` is returned.
fn update_or_insert_pending_components<F>( ///
/// Returns an error if no pending components exist for the given block root (the block must
/// be initialized via `init_pending_block` first).
fn get_pending_components<F>(
&self, &self,
block_root: Hash256, block_root: Hash256,
update_fn: F, update_fn: F,
@@ -504,9 +441,11 @@ impl<T: BeaconChainTypes> PendingPayloadCache<T> {
let mut write_lock = self.availability_cache.write(); let mut write_lock = self.availability_cache.write();
{ {
let pending_components = write_lock.get_or_insert_mut(block_root, || { let pending_components = write_lock.get_mut(&block_root).ok_or_else(|| {
PendingComponents::empty(block_root, self.spec.clone()) AvailabilityCheckError::Unexpected(
}); "pending components not initialized for block".to_string(),
)
})?;
update_fn(pending_components)? update_fn(pending_components)?
} }
@@ -527,6 +466,7 @@ impl<T: BeaconChainTypes> PendingPayloadCache<T> {
} }
/// Check whether data column reconstruction should be attempted. /// Check whether data column reconstruction should be attempted.
/// TODO(gloas): rethink reconstruction for the cell model
fn check_and_set_reconstruction_started( fn check_and_set_reconstruction_started(
&self, &self,
block_root: &Hash256, block_root: &Hash256,
@@ -563,7 +503,7 @@ impl<T: BeaconChainTypes> PendingPayloadCache<T> {
/// status so that we can attempt to retrieve columns from peers again. /// status so that we can attempt to retrieve columns from peers again.
fn handle_reconstruction_failure(&self, block_root: &Hash256) { fn handle_reconstruction_failure(&self, block_root: &Hash256) {
if let Some(pending_components_mut) = self.availability_cache.write().get_mut(block_root) { if let Some(pending_components_mut) = self.availability_cache.write().get_mut(block_root) {
pending_components_mut.verified_data_columns = vec![]; pending_components_mut.verified_data_columns = HashMap::new();
pending_components_mut.reconstruction_started = false; pending_components_mut.reconstruction_started = false;
} }
} }
@@ -578,9 +518,7 @@ impl<T: BeaconChainTypes> PendingPayloadCache<T> {
let mut write_lock = self.availability_cache.write(); let mut write_lock = self.availability_cache.write();
let mut keys_to_remove = vec![]; let mut keys_to_remove = vec![];
for (key, value) in write_lock.iter() { for (key, value) in write_lock.iter() {
if let Some(epoch) = value.epoch() if value.epoch() < cutoff_epoch {
&& epoch < cutoff_epoch
{
keys_to_remove.push(*key); keys_to_remove.push(*key);
} }
} }
@@ -703,7 +641,7 @@ mod data_availability_checker_tests {
use tempfile::{TempDir, tempdir}; use tempfile::{TempDir, tempdir};
use types::{ use types::{
ExecutionPayloadEnvelope, ExecutionPayloadGloas, ExecutionRequests, ForkName, FullPayload, ExecutionPayloadEnvelope, ExecutionPayloadGloas, ExecutionRequests, ForkName, FullPayload,
MinimalEthSpec, SignedBeaconBlock, Slot, MinimalEthSpec, SignedBeaconBlock, SignedExecutionPayloadEnvelope, Slot,
}; };
type E = MinimalEthSpec; type E = MinimalEthSpec;
@@ -807,150 +745,15 @@ mod data_availability_checker_tests {
// once the Gloas harness can produce KZG-valid columns. These wrappers add KZG verification // once the Gloas harness can produce KZG-valid columns. These wrappers add KZG verification
// and custody column filtering on top of `put_kzg_verified_custody_data_columns`. // and custody column filtering on top of `put_kzg_verified_custody_data_columns`.
#[tokio::test] fn num_blobs_in_block<E: EthSpec>(block: &SignedBeaconBlock<E, FullPayload<E>>) -> usize {
async fn test_put_columns_creates_pending_components() { block
if !is_gloas_enabled() {
return;
}
type T = DiskHarnessType<E>;
let (harness, cache, _path) = setup_harness_and_cache::<T>().await;
let mut rng = StdRng::seed_from_u64(0xDEADBEEF);
let spec = harness.spec.clone();
let (_block, data_columns) = generate_rand_block_and_data_columns::<E>(
ForkName::Gloas,
NumBlobs::Number(1),
&mut rng,
&spec,
);
let block_root = Hash256::random();
let verified_columns: Vec<_> = data_columns
.into_iter()
.take(1) // Just take one column for the test
.map(|col| {
KzgVerifiedCustodyDataColumn::from_asserted_custody(
KzgVerifiedDataColumn::__new_for_testing(col),
)
})
.collect();
// Put columns into cache
let result = cache.put_kzg_verified_custody_data_columns(block_root, verified_columns);
assert!(result.is_ok());
// Check that pending components were created
assert_eq!(cache.block_cache_size(), 1);
// Verify columns are cached
let cached_indices = cache.peek_pending_components(&block_root, |components| {
components.map(|c| c.get_cached_data_columns_indices())
});
assert!(cached_indices.is_some());
assert_eq!(cached_indices.unwrap().len(), 1);
}
#[tokio::test]
async fn test_column_deduplication() {
if !is_gloas_enabled() {
return;
}
type T = DiskHarnessType<E>;
let (harness, cache, _path) = setup_harness_and_cache::<T>().await;
let mut rng = StdRng::seed_from_u64(0xDEADBEEF);
let spec = harness.spec.clone();
let (_block, data_columns) = generate_rand_block_and_data_columns::<E>(
ForkName::Gloas,
NumBlobs::Number(1),
&mut rng,
&spec,
);
let block_root = Hash256::random();
// Get the first column
let first_column = data_columns.first().cloned().expect("should have column");
let column_index = *first_column.index();
let verified_column = KzgVerifiedCustodyDataColumn::from_asserted_custody(
KzgVerifiedDataColumn::__new_for_testing(first_column.clone()),
);
// Insert the same column twice
cache
.put_kzg_verified_custody_data_columns(block_root, vec![verified_column.clone()])
.expect("should put column");
cache
.put_kzg_verified_custody_data_columns(block_root, vec![verified_column])
.expect("should put column again");
// Check that we still only have one column (deduplicated)
let cached_indices = cache.peek_pending_components(&block_root, |components| {
components.map(|c| c.get_cached_data_columns_indices())
});
assert!(cached_indices.is_some());
let indices = cached_indices.unwrap();
assert_eq!(indices.len(), 1);
assert_eq!(indices[0], column_index);
}
#[tokio::test]
async fn test_columns_without_block_not_available() {
if !is_gloas_enabled() {
return;
}
type T = DiskHarnessType<E>;
let (harness, cache, _path) = setup_harness_and_cache::<T>().await;
let mut rng = StdRng::seed_from_u64(0xDEADBEEF);
let spec = harness.spec.clone();
let (_block, data_columns) = generate_rand_block_and_data_columns::<E>(
ForkName::Gloas,
NumBlobs::Number(1),
&mut rng,
&spec,
);
let block_root = Hash256::random();
// Add all columns
let verified_columns: Vec<_> = data_columns
.into_iter()
.map(|col| {
KzgVerifiedCustodyDataColumn::from_asserted_custody(
KzgVerifiedDataColumn::__new_for_testing(col),
)
})
.collect();
let result = cache
.put_kzg_verified_custody_data_columns(block_root, verified_columns)
.expect("should put columns");
// Without a bid, should still be missing components
assert!(matches!(result, Availability::MissingComponents(_)));
}
/// Helper to create a test bid with the given block root and kzg commitments from a block.
fn make_test_bid<E: EthSpec>(
block: &SignedBeaconBlock<E, FullPayload<E>>,
) -> Arc<SignedExecutionPayloadBid<E>> {
let bid = block
.message() .message()
.body() .body()
.signed_execution_payload_bid() .signed_execution_payload_bid()
.expect("gloas block should have bid") .expect("gloas block should have bid")
.clone(); .message
Arc::new(bid) .blob_kzg_commitments
.len()
} }
fn make_test_signed_envelope(block_root: Hash256) -> Arc<SignedExecutionPayloadEnvelope<E>> { fn make_test_signed_envelope(block_root: Hash256) -> Arc<SignedExecutionPayloadEnvelope<E>> {
@@ -978,6 +781,135 @@ mod data_availability_checker_tests {
} }
} }
#[tokio::test]
async fn test_put_columns_creates_pending_components() {
if !is_gloas_enabled() {
return;
}
type T = DiskHarnessType<E>;
let (harness, cache, _path) = setup_harness_and_cache::<T>().await;
let mut rng = StdRng::seed_from_u64(0xDEADBEEF);
let spec = harness.spec.clone();
let (block, data_columns) = generate_rand_block_and_data_columns::<E>(
ForkName::Gloas,
NumBlobs::Number(1),
&mut rng,
&spec,
);
let block_root = Hash256::random();
cache.init_pending_block(block_root, Slot::new(0), num_blobs_in_block(&block));
let verified_columns: Vec<_> = data_columns
.into_iter()
.take(1)
.map(|col| {
KzgVerifiedCustodyDataColumn::from_asserted_custody(
KzgVerifiedDataColumn::__new_for_testing(col),
)
})
.collect();
let result = cache.put_kzg_verified_custody_data_columns(block_root, verified_columns);
assert!(result.is_ok());
assert_eq!(cache.block_cache_size(), 1);
let cached_indices = cache.peek_pending_components(&block_root, |components| {
components.map(|c| c.get_cached_data_columns_indices())
});
assert!(cached_indices.is_some());
assert_eq!(cached_indices.unwrap().len(), 1);
}
#[tokio::test]
async fn test_column_deduplication() {
if !is_gloas_enabled() {
return;
}
type T = DiskHarnessType<E>;
let (harness, cache, _path) = setup_harness_and_cache::<T>().await;
let mut rng = StdRng::seed_from_u64(0xDEADBEEF);
let spec = harness.spec.clone();
let (block, data_columns) = generate_rand_block_and_data_columns::<E>(
ForkName::Gloas,
NumBlobs::Number(1),
&mut rng,
&spec,
);
let block_root = Hash256::random();
cache.init_pending_block(block_root, Slot::new(0), num_blobs_in_block(&block));
let first_column = data_columns.first().cloned().expect("should have column");
let column_index = *first_column.index();
let verified_column = KzgVerifiedCustodyDataColumn::from_asserted_custody(
KzgVerifiedDataColumn::__new_for_testing(first_column.clone()),
);
cache
.put_kzg_verified_custody_data_columns(block_root, vec![verified_column.clone()])
.expect("should put column");
cache
.put_kzg_verified_custody_data_columns(block_root, vec![verified_column])
.expect("should put column again");
let cached_indices = cache.peek_pending_components(&block_root, |components| {
components.map(|c| c.get_cached_data_columns_indices())
});
assert!(cached_indices.is_some());
let indices = cached_indices.unwrap();
assert_eq!(indices.len(), 1);
assert_eq!(indices[0], column_index);
}
#[tokio::test]
async fn test_columns_without_envelope_not_available() {
if !is_gloas_enabled() {
return;
}
type T = DiskHarnessType<E>;
let (harness, cache, _path) = setup_harness_and_cache::<T>().await;
let mut rng = StdRng::seed_from_u64(0xDEADBEEF);
let spec = harness.spec.clone();
let (block, data_columns) = generate_rand_block_and_data_columns::<E>(
ForkName::Gloas,
NumBlobs::Number(1),
&mut rng,
&spec,
);
let block_root = Hash256::random();
cache.init_pending_block(block_root, Slot::new(0), num_blobs_in_block(&block));
let verified_columns: Vec<_> = data_columns
.into_iter()
.map(|col| {
KzgVerifiedCustodyDataColumn::from_asserted_custody(
KzgVerifiedDataColumn::__new_for_testing(col),
)
})
.collect();
let result = cache
.put_kzg_verified_custody_data_columns(block_root, verified_columns)
.expect("should put columns");
// Without an executed envelope, should still be missing components
assert!(matches!(result, Availability::MissingComponents(_)));
}
#[tokio::test] #[tokio::test]
async fn test_full_availability_flow() { async fn test_full_availability_flow() {
if !is_gloas_enabled() { if !is_gloas_enabled() {
@@ -998,13 +930,7 @@ mod data_availability_checker_tests {
); );
let block_root = Hash256::random(); let block_root = Hash256::random();
let bid = make_test_bid(&block); cache.init_pending_block(block_root, Slot::new(0), num_blobs_in_block(&block));
cache.put_bid(block_root, bid).expect("should put bid");
assert!(matches!(
cache.put_bid(block_root, make_test_bid(&block)),
Ok(Availability::MissingComponents(_))
));
let verified_columns: Vec<_> = data_columns let verified_columns: Vec<_> = data_columns
.into_iter() .into_iter()
@@ -1021,21 +947,6 @@ mod data_availability_checker_tests {
assert!(matches!(result, Availability::MissingComponents(_))); assert!(matches!(result, Availability::MissingComponents(_)));
// Insert pre-executed envelope first
cache
.put_pre_executed_payload_envelope(
make_test_signed_envelope(block_root),
BlockImportSource::Gossip,
)
.expect("should put pre-executed envelope");
let status = cache.get_envelope_processing_status(&block_root);
assert!(matches!(
status,
Some(PayloadEnvelopeProcessingStatus::NotValidated(..))
));
// Upgrade to executed envelope (after EL validation)
let executed_envelope = make_test_executed_envelope(block_root); let executed_envelope = make_test_executed_envelope(block_root);
let result = cache let result = cache
.put_executed_payload_envelope(executed_envelope) .put_executed_payload_envelope(executed_envelope)
@@ -1049,152 +960,7 @@ mod data_availability_checker_tests {
} }
#[tokio::test] #[tokio::test]
async fn test_zero_blob_bid_immediately_available() { async fn test_zero_blob_immediately_available() {
if !is_gloas_enabled() {
return;
}
type T = DiskHarnessType<E>;
let (harness, cache, _path) = setup_harness_and_cache::<T>().await;
let mut rng = StdRng::seed_from_u64(0xDEADBEEF);
let spec = harness.spec.clone();
// Generate a block with 0 blobs — bid will have empty commitments
let (block, _data_columns) = generate_rand_block_and_data_columns::<E>(
ForkName::Gloas,
NumBlobs::Number(0),
&mut rng,
&spec,
);
let block_root = Hash256::random();
let bid = make_test_bid(&block);
// Insert bid (no blobs expected)
cache.put_bid(block_root, bid).expect("should put bid");
// Insert executed envelope — should become available immediately (no columns needed)
let executed_envelope = make_test_executed_envelope(block_root);
let result = cache
.put_executed_payload_envelope(executed_envelope)
.expect("should put executed envelope");
assert!(
matches!(result, Availability::Available(_)),
"zero-blob bid should be immediately available, got {:?}",
result
);
}
#[tokio::test]
async fn test_columns_arrive_before_bid() {
if !is_gloas_enabled() {
return;
}
type T = DiskHarnessType<E>;
let (harness, cache, _path) = setup_harness_and_cache::<T>().await;
let mut rng = StdRng::seed_from_u64(0xDEADBEEF);
let spec = harness.spec.clone();
let (block, data_columns) = generate_rand_block_and_data_columns::<E>(
ForkName::Gloas,
NumBlobs::Number(1),
&mut rng,
&spec,
);
let block_root = Hash256::random();
// Columns arrive before bid
let verified_columns: Vec<_> = data_columns
.into_iter()
.map(|col| {
KzgVerifiedCustodyDataColumn::from_asserted_custody(
KzgVerifiedDataColumn::__new_for_testing(col),
)
})
.collect();
let result = cache
.put_kzg_verified_custody_data_columns(block_root, verified_columns)
.expect("should put columns");
assert!(matches!(result, Availability::MissingComponents(_)));
let bid = make_test_bid(&block);
let result = cache.put_bid(block_root, bid).expect("should put bid");
assert!(matches!(result, Availability::MissingComponents(_)));
let executed_envelope = make_test_executed_envelope(block_root);
let result = cache
.put_executed_payload_envelope(executed_envelope)
.expect("should put executed envelope");
assert!(
matches!(result, Availability::Available(_)),
"expected Available after all components inserted, got {:?}",
result
);
}
#[tokio::test]
async fn test_pre_executed_envelope_not_available() {
if !is_gloas_enabled() {
return;
}
type T = DiskHarnessType<E>;
let (harness, cache, _path) = setup_harness_and_cache::<T>().await;
let mut rng = StdRng::seed_from_u64(0xDEADBEEF);
let spec = harness.spec.clone();
let (block, data_columns) = generate_rand_block_and_data_columns::<E>(
ForkName::Gloas,
NumBlobs::Number(1),
&mut rng,
&spec,
);
let block_root = Hash256::random();
// Insert bid + all columns
cache
.put_bid(block_root, make_test_bid(&block))
.expect("should put bid");
let verified_columns: Vec<_> = data_columns
.into_iter()
.map(|col| {
KzgVerifiedCustodyDataColumn::from_asserted_custody(
KzgVerifiedDataColumn::__new_for_testing(col),
)
})
.collect();
cache
.put_kzg_verified_custody_data_columns(block_root, verified_columns)
.expect("should put columns");
// Insert pre-executed envelope (not yet validated by EL)
cache
.put_pre_executed_payload_envelope(
make_test_signed_envelope(block_root),
BlockImportSource::Gossip,
)
.expect("should put pre-executed envelope");
// Should NOT be available — envelope not executed yet
let status = cache.get_envelope_processing_status(&block_root);
assert!(matches!(
status,
Some(PayloadEnvelopeProcessingStatus::NotValidated(..))
));
}
#[tokio::test]
async fn test_remove_pre_executed_envelope() {
if !is_gloas_enabled() { if !is_gloas_enabled() {
return; return;
} }
@@ -1203,93 +969,18 @@ mod data_availability_checker_tests {
let (_harness, cache, _path) = setup_harness_and_cache::<T>().await; let (_harness, cache, _path) = setup_harness_and_cache::<T>().await;
let block_root = Hash256::random(); let block_root = Hash256::random();
cache.init_pending_block(block_root, Slot::new(0), 0);
// Insert pre-executed envelope
cache
.put_pre_executed_payload_envelope(
make_test_signed_envelope(block_root),
BlockImportSource::Gossip,
)
.expect("should put pre-executed envelope");
// Verify it's there
assert!(cache.get_envelope_processing_status(&block_root).is_some());
// Remove it
cache.remove_pre_executed_payload_envelope(&block_root);
// Should be gone
let status = cache.get_envelope_processing_status(&block_root);
assert!(status.is_none());
}
#[tokio::test]
async fn test_remove_pre_executed_does_not_remove_executed() {
if !is_gloas_enabled() {
return;
}
type T = DiskHarnessType<E>;
let (_harness, cache, _path) = setup_harness_and_cache::<T>().await;
let block_root = Hash256::random();
// Insert executed envelope
let executed_envelope = make_test_executed_envelope(block_root); let executed_envelope = make_test_executed_envelope(block_root);
cache let result = cache
.put_executed_payload_envelope(executed_envelope) .put_executed_payload_envelope(executed_envelope)
.expect("should put executed envelope"); .expect("should put executed envelope");
// Try to remove as pre-executed — should be a no-op assert!(
cache.remove_pre_executed_payload_envelope(&block_root); matches!(result, Availability::Available(_)),
"zero-blob block should be immediately available, got {:?}",
// Should still be there as executed result
let status = cache.get_envelope_processing_status(&block_root);
assert!(matches!(
status,
Some(PayloadEnvelopeProcessingStatus::ExecutionValidated(..))
));
}
#[tokio::test]
async fn test_reconstruction_started_flag() {
if !is_gloas_enabled() {
return;
}
type T = DiskHarnessType<E>;
let (harness, cache, _path) = setup_harness_and_cache::<T>().await;
let mut rng = StdRng::seed_from_u64(0xDEADBEEF);
let spec = harness.spec.clone();
let (_block, data_columns) = generate_rand_block_and_data_columns::<E>(
ForkName::Gloas,
NumBlobs::Number(1),
&mut rng,
&spec,
); );
let block_root = Hash256::random();
// Add some columns (not enough for reconstruction threshold)
let verified_columns: Vec<_> = data_columns
.into_iter()
.take(10) // Not enough for reconstruction
.map(|col| {
KzgVerifiedCustodyDataColumn::from_asserted_custody(
KzgVerifiedDataColumn::__new_for_testing(col),
)
})
.collect();
cache
.put_kzg_verified_custody_data_columns(block_root, verified_columns)
.expect("should put columns");
// Check reconstruction decision - should say "not enough columns"
let decision = cache.check_and_set_reconstruction_started(&block_root);
assert!(matches!(decision, ReconstructColumnsDecision::No(_)));
} }
#[tokio::test] #[tokio::test]
@@ -1304,7 +995,7 @@ mod data_availability_checker_tests {
let mut rng = StdRng::seed_from_u64(0xDEADBEEF); let mut rng = StdRng::seed_from_u64(0xDEADBEEF);
let spec = harness.spec.clone(); let spec = harness.spec.clone();
let (_block, data_columns) = generate_rand_block_and_data_columns::<E>( let (block, data_columns) = generate_rand_block_and_data_columns::<E>(
ForkName::Gloas, ForkName::Gloas,
NumBlobs::Number(1), NumBlobs::Number(1),
&mut rng, &mut rng,
@@ -1312,8 +1003,8 @@ mod data_availability_checker_tests {
); );
let block_root = Hash256::random(); let block_root = Hash256::random();
cache.init_pending_block(block_root, Slot::new(0), num_blobs_in_block(&block));
// Add some columns
let verified_columns: Vec<_> = data_columns let verified_columns: Vec<_> = data_columns
.into_iter() .into_iter()
.take(5) .take(5)
@@ -1328,16 +1019,13 @@ mod data_availability_checker_tests {
.put_kzg_verified_custody_data_columns(block_root, verified_columns) .put_kzg_verified_custody_data_columns(block_root, verified_columns)
.expect("should put columns"); .expect("should put columns");
// Verify columns are cached
let cached_count = cache.peek_pending_components(&block_root, |components| { let cached_count = cache.peek_pending_components(&block_root, |components| {
components.map(|c| c.verified_data_columns.len()) components.map(|c| c.verified_data_columns.len())
}); });
assert_eq!(cached_count, Some(5)); assert_eq!(cached_count, Some(5));
// Handle reconstruction failure
cache.handle_reconstruction_failure(&block_root); cache.handle_reconstruction_failure(&block_root);
// Verify columns are cleared
let cached_count_after = cache.peek_pending_components(&block_root, |components| { let cached_count_after = cache.peek_pending_components(&block_root, |components| {
components.map(|c| c.verified_data_columns.len()) components.map(|c| c.verified_data_columns.len())
}); });
@@ -1353,13 +1041,11 @@ mod data_availability_checker_tests {
type T = DiskHarnessType<E>; type T = DiskHarnessType<E>;
let (_harness, cache, _path) = setup_harness_and_cache::<T>().await; let (_harness, cache, _path) = setup_harness_and_cache::<T>().await;
// Run maintenance with a future cutoff epoch
let cutoff_epoch = Epoch::new(100); let cutoff_epoch = Epoch::new(100);
cache cache
.do_maintenance(cutoff_epoch) .do_maintenance(cutoff_epoch)
.expect("maintenance should succeed"); .expect("maintenance should succeed");
// Cache should still be empty since we didn't add anything with an epoch
assert_eq!(cache.block_cache_size(), 0); assert_eq!(cache.block_cache_size(), 0);
} }
@@ -1375,7 +1061,7 @@ mod data_availability_checker_tests {
let mut rng = StdRng::seed_from_u64(0xDEADBEEF); let mut rng = StdRng::seed_from_u64(0xDEADBEEF);
let spec = harness.spec.clone(); let spec = harness.spec.clone();
let (_block, data_columns) = generate_rand_block_and_data_columns::<E>( let (block, data_columns) = generate_rand_block_and_data_columns::<E>(
ForkName::Gloas, ForkName::Gloas,
NumBlobs::Number(1), NumBlobs::Number(1),
&mut rng, &mut rng,
@@ -1384,10 +1070,10 @@ mod data_availability_checker_tests {
let block_root = Hash256::random(); let block_root = Hash256::random();
// No columns yet
assert!(cache.get_data_columns(block_root).is_none()); assert!(cache.get_data_columns(block_root).is_none());
// Add columns cache.init_pending_block(block_root, Slot::new(0), num_blobs_in_block(&block));
let verified_columns: Vec<_> = data_columns let verified_columns: Vec<_> = data_columns
.into_iter() .into_iter()
.take(3) .take(3)
@@ -1402,7 +1088,6 @@ mod data_availability_checker_tests {
.put_kzg_verified_custody_data_columns(block_root, verified_columns) .put_kzg_verified_custody_data_columns(block_root, verified_columns)
.expect("should put columns"); .expect("should put columns");
// Now columns should be returned
let peeked = cache.get_data_columns(block_root); let peeked = cache.get_data_columns(block_root);
assert!(peeked.is_some()); assert!(peeked.is_some());
assert_eq!(peeked.unwrap().len(), 3); assert_eq!(peeked.unwrap().len(), 3);
@@ -1420,18 +1105,20 @@ mod data_availability_checker_tests {
let mut rng = StdRng::seed_from_u64(0xDEADBEEF); let mut rng = StdRng::seed_from_u64(0xDEADBEEF);
let spec = harness.spec.clone(); let spec = harness.spec.clone();
let (_block, data_columns) = generate_rand_block_and_data_columns::<E>( let (block, data_columns) = generate_rand_block_and_data_columns::<E>(
ForkName::Gloas, ForkName::Gloas,
NumBlobs::Number(1), NumBlobs::Number(1),
&mut rng, &mut rng,
&spec, &spec,
); );
// LRU capacity is 32 (OVERFLOW_LRU_CAPACITY_NON_ZERO). Insert 33 entries. let num_blobs = num_blobs_in_block(&block);
let mut roots = Vec::new(); let mut roots = Vec::new();
for _ in 0..33 { for _ in 0..33 {
let block_root = Hash256::random(); let block_root = Hash256::random();
roots.push(block_root); roots.push(block_root);
cache.init_pending_block(block_root, Slot::new(0), num_blobs);
let col = data_columns.first().cloned().expect("should have column"); let col = data_columns.first().cloned().expect("should have column");
let verified = vec![KzgVerifiedCustodyDataColumn::from_asserted_custody( let verified = vec![KzgVerifiedCustodyDataColumn::from_asserted_custody(
KzgVerifiedDataColumn::__new_for_testing(col), KzgVerifiedDataColumn::__new_for_testing(col),
@@ -1466,11 +1153,7 @@ mod data_availability_checker_tests {
); );
let block_root = Hash256::random(); let block_root = Hash256::random();
cache.init_pending_block(block_root, Slot::new(0), num_blobs_in_block(&block));
// Insert bid (gives the entry an epoch via the bid's slot)
cache
.put_bid(block_root, make_test_bid(&block))
.expect("should put bid");
let col = data_columns.first().cloned().expect("should have column"); let col = data_columns.first().cloned().expect("should have column");
let verified = vec![KzgVerifiedCustodyDataColumn::from_asserted_custody( let verified = vec![KzgVerifiedCustodyDataColumn::from_asserted_custody(
@@ -1482,7 +1165,7 @@ mod data_availability_checker_tests {
assert_eq!(cache.block_cache_size(), 1); assert_eq!(cache.block_cache_size(), 1);
// Maintenance with cutoff in the future should prune (bid slot=0 → epoch=0 < cutoff=100) // slot=0 → epoch=0 < cutoff=100, should prune
cache cache
.do_maintenance(Epoch::new(100)) .do_maintenance(Epoch::new(100))
.expect("maintenance should succeed"); .expect("maintenance should succeed");
@@ -1490,58 +1173,6 @@ mod data_availability_checker_tests {
assert_eq!(cache.block_cache_size(), 0); assert_eq!(cache.block_cache_size(), 0);
} }
#[tokio::test]
async fn test_double_reconstruction_prevented() {
if !is_gloas_enabled() {
return;
}
type T = DiskHarnessType<E>;
let (harness, cache, _path) = setup_harness_and_cache::<T>().await;
let mut rng = StdRng::seed_from_u64(0xDEADBEEF);
let spec = harness.spec.clone();
let (_block, data_columns) = generate_rand_block_and_data_columns::<E>(
ForkName::Gloas,
NumBlobs::Number(1),
&mut rng,
&spec,
);
let block_root = Hash256::random();
// Insert all columns so reconstruction threshold is met
let verified_columns: Vec<_> = data_columns
.into_iter()
.map(|col| {
KzgVerifiedCustodyDataColumn::from_asserted_custody(
KzgVerifiedDataColumn::__new_for_testing(col),
)
})
.collect();
cache
.put_kzg_verified_custody_data_columns(block_root, verified_columns)
.expect("should put columns");
// Manually set reconstruction_started via check_and_set
// For fullnode, sampling == all columns, so this returns No("all sampling columns received")
// But we can set the flag manually to test the guard
cache
.availability_cache
.write()
.get_mut(&block_root)
.expect("should exist")
.reconstruction_started = true;
let decision = cache.check_and_set_reconstruction_started(&block_root);
assert!(
matches!(decision, ReconstructColumnsDecision::No(reason) if reason == "already started"),
"second reconstruction attempt should be prevented"
);
}
#[tokio::test] #[tokio::test]
async fn test_partial_columns_missing_components() { async fn test_partial_columns_missing_components() {
if !is_gloas_enabled() { if !is_gloas_enabled() {
@@ -1562,11 +1193,7 @@ mod data_availability_checker_tests {
); );
let block_root = Hash256::random(); let block_root = Hash256::random();
cache.init_pending_block(block_root, Slot::new(0), num_blobs_in_block(&block));
// Insert bid and executed envelope
cache
.put_bid(block_root, make_test_bid(&block))
.expect("should put bid");
let executed_envelope = make_test_executed_envelope(block_root); let executed_envelope = make_test_executed_envelope(block_root);
cache cache

View File

@@ -3,6 +3,7 @@ use ssz_types::VariableList;
use std::sync::Arc; use std::sync::Arc;
use types::{Cell, ColumnIndex, DataColumnSidecar, DataColumnSidecarGloas, EthSpec, Hash256, Slot}; use types::{Cell, ColumnIndex, DataColumnSidecar, DataColumnSidecarGloas, EthSpec, Hash256, Slot};
#[derive(Clone)]
pub struct PendingColumn<E: EthSpec> { pub struct PendingColumn<E: EthSpec> {
cells: Vec<Option<(Cell<E>, KzgProof)>>, cells: Vec<Option<(Cell<E>, KzgProof)>>,
} }

View File

@@ -9,10 +9,7 @@ use std::collections::HashMap;
use std::sync::Arc; use std::sync::Arc;
use tracing::{Span, debug, debug_span}; use tracing::{Span, debug, debug_span};
use types::Slot; use types::Slot;
use types::{ use types::{ChainSpec, ColumnIndex, Epoch, EthSpec, Hash256};
ChainSpec, ColumnIndex, DataColumnSidecar, Epoch, EthSpec, Hash256,
SignedExecutionPayloadEnvelope,
};
/// This represents the components of a payload pending data availability. /// This represents the components of a payload pending data availability.
/// ///