Add tests

This commit is contained in:
Eitan Seri-Levi
2026-05-22 15:31:09 +03:00
parent 9f4e3f367a
commit b076e09876
2 changed files with 83 additions and 11 deletions

View File

@@ -853,9 +853,7 @@ impl<S: SlotClock> ReprocessQueue<S> {
} }
// Unqueue the data columns we have for this root, if any. // Unqueue the data columns we have for this root, if any.
if let Some(queued_ids) = if let Some(queued_ids) = self.awaiting_data_columns_per_root.remove(&block_root) {
self.awaiting_data_columns_per_root.remove(&block_root)
{
for col_id in queued_ids { for col_id in queued_ids {
if let Some((data_column, delay_key)) = if let Some((data_column, delay_key)) =
self.queued_gossip_data_columns.remove(&col_id) self.queued_gossip_data_columns.remove(&col_id)
@@ -866,10 +864,7 @@ impl<S: SlotClock> ReprocessQueue<S> {
.try_send(ReadyWork::DataColumn(data_column)) .try_send(ReadyWork::DataColumn(data_column))
.is_err() .is_err()
{ {
error!( error!(?block_root, "Failed to send data column for reprocessing");
?block_root,
"Failed to send data column for reprocessing"
);
} }
} }
} }
@@ -1724,4 +1719,82 @@ mod tests {
.contains_key(&overflow_root) .contains_key(&overflow_root)
); );
} }
/// Tests that a queued gossip data column is released when its block is imported.
#[tokio::test]
async fn data_column_released_on_block_imported() {
create_test_tracing_subscriber();
let config = BeaconProcessorConfig::default();
let (ready_work_tx, mut ready_work_rx) =
mpsc::channel::<ReadyWork>(config.max_scheduled_work_queue_len);
let (_, reprocess_work_rx) =
mpsc::channel::<ReprocessQueueMessage>(config.max_scheduled_work_queue_len);
let slot_clock = Arc::new(testing_slot_clock(12));
let mut queue = ReprocessQueue::new(ready_work_tx, reprocess_work_rx, slot_clock);
tokio::time::pause();
let beacon_block_root = Hash256::repeat_byte(0xbb);
let msg = ReprocessQueueMessage::UnknownBlockDataColumn(QueuedGossipDataColumn {
beacon_block_root,
process_fn: Box::new(|| {}),
});
queue.handle_message(InboundEvent::Msg(msg));
assert_eq!(queue.awaiting_data_columns_per_root.len(), 1);
assert_eq!(queue.queued_gossip_data_columns.len(), 1);
assert_eq!(queue.data_columns_delay_queue.len(), 1);
// Simulate block import.
queue.handle_message(InboundEvent::Msg(ReprocessQueueMessage::BlockImported {
block_root: beacon_block_root,
parent_root: Hash256::repeat_byte(0x00),
}));
// Internal state should be cleaned up.
assert!(queue.awaiting_data_columns_per_root.is_empty());
assert!(queue.queued_gossip_data_columns.is_empty());
assert_eq!(queue.data_columns_delay_queue.len(), 0);
// The column should have been sent to the ready_work channel.
let ready = ready_work_rx.try_recv().expect("column should be ready");
assert!(matches!(ready, ReadyWork::DataColumn(_)));
}
/// Tests that an expired gossip data column is pruned cleanly from all internal state.
#[tokio::test]
async fn prune_awaiting_data_columns_per_root() {
create_test_tracing_subscriber();
let mut queue = test_queue();
tokio::time::pause();
let beacon_block_root = Hash256::repeat_byte(0xcd);
let msg = ReprocessQueueMessage::UnknownBlockDataColumn(QueuedGossipDataColumn {
beacon_block_root,
process_fn: Box::new(|| {}),
});
queue.handle_message(InboundEvent::Msg(msg));
assert_eq!(queue.awaiting_data_columns_per_root.len(), 1);
assert!(
queue
.awaiting_data_columns_per_root
.contains_key(&beacon_block_root)
);
// Advance time past the delay so the entry expires.
advance_time(&queue.slot_clock, 2 * QUEUED_ATTESTATION_DELAY).await;
let ready_msg = queue.next().await.unwrap();
assert!(matches!(ready_msg, InboundEvent::ReadyDataColumn(_)));
queue.handle_message(ready_msg);
// All internal state should be cleaned up.
assert!(queue.awaiting_data_columns_per_root.is_empty());
assert!(queue.queued_gossip_data_columns.is_empty());
}
} }

View File

@@ -740,8 +740,8 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
// Queue the column for reprocessing when the block arrives. // Queue the column for reprocessing when the block arrives.
let processor = self.clone(); let processor = self.clone();
let reprocess_msg = ReprocessQueueMessage::UnknownBlockDataColumn( let reprocess_msg =
QueuedGossipDataColumn { ReprocessQueueMessage::UnknownBlockDataColumn(QueuedGossipDataColumn {
beacon_block_root: unknown_block_root, beacon_block_root: unknown_block_root,
process_fn: Box::new(move || { process_fn: Box::new(move || {
// Re-dispatch through the normal gossip column processing path. // Re-dispatch through the normal gossip column processing path.
@@ -753,8 +753,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
seen_duration, seen_duration,
); );
}), }),
}, });
);
if self if self
.beacon_processor_send .beacon_processor_send
.try_send(WorkEvent { .try_send(WorkEvent {