diff --git a/beacon_node/beacon_processor/src/scheduler/work_reprocessing_queue.rs b/beacon_node/beacon_processor/src/scheduler/work_reprocessing_queue.rs index 9ff26e7841..c99388287c 100644 --- a/beacon_node/beacon_processor/src/scheduler/work_reprocessing_queue.rs +++ b/beacon_node/beacon_processor/src/scheduler/work_reprocessing_queue.rs @@ -761,10 +761,10 @@ impl ReprocessQueue { let reconstruction_deadline_millis = (slot_duration * RECONSTRUCTION_DEADLINE.0) / RECONSTRUCTION_DEADLINE.1; let reconstruction_deadline = Duration::from_millis(reconstruction_deadline_millis); - if let Some(seconds_from_current_slot) = - self.slot_clock.seconds_from_current_slot_start() + if let Some(duration_from_current_slot) = + self.slot_clock.millis_from_current_slot_start() && let Some(current_slot) = self.slot_clock.now() - && seconds_from_current_slot >= reconstruction_deadline + && duration_from_current_slot >= reconstruction_deadline && current_slot == request.slot { // If we are at least `reconstruction_deadline` seconds into the current slot, @@ -1227,4 +1227,116 @@ mod tests { // The entry for the block root should be gone. assert!(queue.awaiting_lc_updates_per_parent_root.is_empty()); } + + async fn test_reconstruction_immediate_at_deadline(slot_duration_secs: u64) { + let config = BeaconProcessorConfig::default(); + let (ready_work_tx, _) = mpsc::channel::(config.max_scheduled_work_queue_len); + let (_, reprocess_work_rx) = + mpsc::channel::(config.max_scheduled_work_queue_len); + let slot_clock = Arc::new(testing_slot_clock(slot_duration_secs)); + let mut queue = ReprocessQueue::new(ready_work_tx, reprocess_work_rx, slot_clock); + + let slot_duration = queue.slot_clock.slot_duration(); + let reconstruction_deadline_millis = (slot_duration.as_millis() as u64 + * RECONSTRUCTION_DEADLINE.0) + / RECONSTRUCTION_DEADLINE.1; + let reconstruction_deadline = Duration::from_millis(reconstruction_deadline_millis); + + // Advance time to just after the deadline + advance_time( + &queue.slot_clock, + reconstruction_deadline + Duration::from_millis(10), + ) + .await; + + let current_slot = queue.slot_clock.now().unwrap(); + let block_root = Hash256::repeat_byte(0xaa); + + // Queue a reconstruction for the current slot after the deadline + let reconstruction_request = QueuedColumnReconstruction { + block_root, + slot: current_slot, + process_fn: Box::pin(async {}), + }; + queue.handle_message(InboundEvent::Msg( + ReprocessQueueMessage::DelayColumnReconstruction(reconstruction_request), + )); + + assert_eq!(queue.queued_column_reconstructions.len(), 1); + + // Should be immediately ready (0 delay since we're past deadline) + let ready_msg = queue.next().await.unwrap(); + assert!(matches!( + ready_msg, + InboundEvent::ReadyColumnReconstruction(_) + )); + + if let InboundEvent::ReadyColumnReconstruction(reconstruction) = ready_msg { + assert_eq!(reconstruction.block_root, block_root); + queue.handle_message(InboundEvent::ReadyColumnReconstruction(reconstruction)); + } + + assert!(queue.queued_column_reconstructions.is_empty()); + } + + /// Tests that column reconstruction queued after the deadline is triggered immediately + /// on mainnet (12s slots). + /// + /// When a reconstruction for the current slot is queued after the reconstruction deadline + /// (1/4 of slot duration = 3s for mainnet), it should be processed immediately with 0 delay. + #[tokio::test] + async fn column_reconstruction_immediate_processing_at_deadline_mainnet() { + tokio::time::pause(); + test_reconstruction_immediate_at_deadline(12).await; + } + + /// Tests that column reconstruction queued after the deadline is triggered immediately + /// on Gnosis (5s slots). + /// + /// When a reconstruction for the current slot is queued after the reconstruction deadline + /// (1/4 of slot duration = 1.25s for Gnosis), it should be processed immediately with 0 delay. + #[tokio::test] + async fn column_reconstruction_immediate_processing_at_deadline_gnosis() { + tokio::time::pause(); + test_reconstruction_immediate_at_deadline(5).await; + } + + /// Tests that column reconstruction uses the standard delay when queued before the deadline. + /// + /// When a reconstruction for the current slot is queued before the deadline, it should wait + /// for the standard QUEUED_RECONSTRUCTION_DELAY (150ms) before being triggered. + #[tokio::test] + async fn column_reconstruction_uses_standard_delay() { + tokio::time::pause(); + + let mut queue = test_queue(); + let current_slot = queue.slot_clock.now().unwrap(); + let block_root = Hash256::repeat_byte(0xcc); + + // Queue a reconstruction at the start of the slot (before deadline) + let reconstruction_request = QueuedColumnReconstruction { + block_root, + slot: current_slot, + process_fn: Box::pin(async {}), + }; + queue.handle_message(InboundEvent::Msg( + ReprocessQueueMessage::DelayColumnReconstruction(reconstruction_request), + )); + + assert_eq!(queue.queued_column_reconstructions.len(), 1); + + // Advance time by QUEUED_RECONSTRUCTION_DELAY + advance_time(&queue.slot_clock, QUEUED_RECONSTRUCTION_DELAY).await; + + // Should be ready after the standard delay + let ready_msg = queue.next().await.unwrap(); + assert!(matches!( + ready_msg, + InboundEvent::ReadyColumnReconstruction(_) + )); + + if let InboundEvent::ReadyColumnReconstruction(reconstruction) = ready_msg { + assert_eq!(reconstruction.block_root, block_root); + } + } }