diff --git a/beacon_node/beacon_processor/src/scheduler/work_reprocessing_queue.rs b/beacon_node/beacon_processor/src/scheduler/work_reprocessing_queue.rs index 1b875162c6..6a61f61362 100644 --- a/beacon_node/beacon_processor/src/scheduler/work_reprocessing_queue.rs +++ b/beacon_node/beacon_processor/src/scheduler/work_reprocessing_queue.rs @@ -853,9 +853,7 @@ impl ReprocessQueue { } // Unqueue the data columns we have for this root, if any. - if let Some(queued_ids) = - self.awaiting_data_columns_per_root.remove(&block_root) - { + if let Some(queued_ids) = self.awaiting_data_columns_per_root.remove(&block_root) { for col_id in queued_ids { if let Some((data_column, delay_key)) = self.queued_gossip_data_columns.remove(&col_id) @@ -866,10 +864,7 @@ impl ReprocessQueue { .try_send(ReadyWork::DataColumn(data_column)) .is_err() { - error!( - ?block_root, - "Failed to send data column for reprocessing" - ); + error!(?block_root, "Failed to send data column for reprocessing"); } } } @@ -1724,4 +1719,82 @@ mod tests { .contains_key(&overflow_root) ); } + + /// Tests that a queued gossip data column is released when its block is imported. + #[tokio::test] + async fn data_column_released_on_block_imported() { + create_test_tracing_subscriber(); + + let config = BeaconProcessorConfig::default(); + let (ready_work_tx, mut ready_work_rx) = + mpsc::channel::(config.max_scheduled_work_queue_len); + let (_, reprocess_work_rx) = + mpsc::channel::(config.max_scheduled_work_queue_len); + let slot_clock = Arc::new(testing_slot_clock(12)); + let mut queue = ReprocessQueue::new(ready_work_tx, reprocess_work_rx, slot_clock); + + tokio::time::pause(); + + let beacon_block_root = Hash256::repeat_byte(0xbb); + + let msg = ReprocessQueueMessage::UnknownBlockDataColumn(QueuedGossipDataColumn { + beacon_block_root, + process_fn: Box::new(|| {}), + }); + queue.handle_message(InboundEvent::Msg(msg)); + + assert_eq!(queue.awaiting_data_columns_per_root.len(), 1); + assert_eq!(queue.queued_gossip_data_columns.len(), 1); + assert_eq!(queue.data_columns_delay_queue.len(), 1); + + // Simulate block import. + queue.handle_message(InboundEvent::Msg(ReprocessQueueMessage::BlockImported { + block_root: beacon_block_root, + parent_root: Hash256::repeat_byte(0x00), + })); + + // Internal state should be cleaned up. + assert!(queue.awaiting_data_columns_per_root.is_empty()); + assert!(queue.queued_gossip_data_columns.is_empty()); + assert_eq!(queue.data_columns_delay_queue.len(), 0); + + // The column should have been sent to the ready_work channel. + let ready = ready_work_rx.try_recv().expect("column should be ready"); + assert!(matches!(ready, ReadyWork::DataColumn(_))); + } + + /// Tests that an expired gossip data column is pruned cleanly from all internal state. + #[tokio::test] + async fn prune_awaiting_data_columns_per_root() { + create_test_tracing_subscriber(); + + let mut queue = test_queue(); + + tokio::time::pause(); + + let beacon_block_root = Hash256::repeat_byte(0xcd); + + let msg = ReprocessQueueMessage::UnknownBlockDataColumn(QueuedGossipDataColumn { + beacon_block_root, + process_fn: Box::new(|| {}), + }); + queue.handle_message(InboundEvent::Msg(msg)); + + assert_eq!(queue.awaiting_data_columns_per_root.len(), 1); + assert!( + queue + .awaiting_data_columns_per_root + .contains_key(&beacon_block_root) + ); + + // Advance time past the delay so the entry expires. + advance_time(&queue.slot_clock, 2 * QUEUED_ATTESTATION_DELAY).await; + let ready_msg = queue.next().await.unwrap(); + assert!(matches!(ready_msg, InboundEvent::ReadyDataColumn(_))); + queue.handle_message(ready_msg); + + // All internal state should be cleaned up. + assert!(queue.awaiting_data_columns_per_root.is_empty()); + assert!(queue.queued_gossip_data_columns.is_empty()); + } } diff --git a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs index 5dce67ec55..892baf7ee2 100644 --- a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs @@ -740,8 +740,8 @@ impl NetworkBeaconProcessor { // Queue the column for reprocessing when the block arrives. let processor = self.clone(); - let reprocess_msg = ReprocessQueueMessage::UnknownBlockDataColumn( - QueuedGossipDataColumn { + let reprocess_msg = + ReprocessQueueMessage::UnknownBlockDataColumn(QueuedGossipDataColumn { beacon_block_root: unknown_block_root, process_fn: Box::new(move || { // Re-dispatch through the normal gossip column processing path. @@ -753,8 +753,7 @@ impl NetworkBeaconProcessor { seen_duration, ); }), - }, - ); + }); if self .beacon_processor_send .try_send(WorkEvent {