From 330348ea14bd58828564005795f99ffe874bc7c1 Mon Sep 17 00:00:00 2001 From: jking-aus <72330194+jking-aus@users.noreply.github.com> Date: Fri, 1 May 2026 14:44:25 +0200 Subject: [PATCH] fix: prevent duplicate column reconstruction dispatch (#9250) Fixes a flaky CI failure in `data_column_reconstruction_at_deadline` where 2 `column_reconstruction` events are emitted instead of the expected 1. - Change `queued_column_reconstructions` from `HashMap` to `HashMap>`, where `None` indicates reconstruction was already dispatched. - On dispatch (`ReadyColumnReconstruction`), set the entry to `None` instead of removing it. This prevents a subsequent gossip column from inserting a fresh reconstruction request into the now-vacant slot. - Prune stale `None` entries on each dispatch to keep the map bounded. Co-Authored-By: Josh King --- .../src/scheduler/work_reprocessing_queue.rs | 25 +++++++++++-------- 1 file changed, 15 insertions(+), 10 deletions(-) diff --git a/beacon_node/beacon_processor/src/scheduler/work_reprocessing_queue.rs b/beacon_node/beacon_processor/src/scheduler/work_reprocessing_queue.rs index 38306b3bb6..b1fa56af01 100644 --- a/beacon_node/beacon_processor/src/scheduler/work_reprocessing_queue.rs +++ b/beacon_node/beacon_processor/src/scheduler/work_reprocessing_queue.rs @@ -280,8 +280,8 @@ struct ReprocessQueue { queued_lc_updates: FnvHashMap, /// Light Client Updates per parent_root. awaiting_lc_updates_per_parent_root: HashMap>, - /// Column reconstruction per block root. - queued_column_reconstructions: HashMap, + /// Column reconstruction per block root. `None` means reconstruction was already dispatched. + queued_column_reconstructions: HashMap>, /// Queued backfill batches queued_backfill_batches: Vec, @@ -865,20 +865,20 @@ impl ReprocessQueue { && duration_from_current_slot >= reconstruction_deadline && current_slot == request.slot { - // If we are at least `reconstruction_deadline` seconds into the current slot, - // and the reconstruction request is for the current slot, process reconstruction immediately. reconstruction_delay = Duration::from_secs(0); } match self.queued_column_reconstructions.entry(request.block_root) { - Entry::Occupied(key) => { - self.column_reconstructions_delay_queue - .reset(key.get(), reconstruction_delay); + Entry::Occupied(entry) => { + if let Some(delay_key) = entry.get() { + self.column_reconstructions_delay_queue + .reset(delay_key, reconstruction_delay); + } } Entry::Vacant(vacant) => { let delay_key = self .column_reconstructions_delay_queue .insert(request, reconstruction_delay); - vacant.insert(delay_key); + vacant.insert(Some(delay_key)); } } } @@ -1039,7 +1039,9 @@ impl ReprocessQueue { } InboundEvent::ReadyColumnReconstruction(column_reconstruction) => { self.queued_column_reconstructions - .remove(&column_reconstruction.block_root); + .retain(|_, v| v.is_some()); + self.queued_column_reconstructions + .insert(column_reconstruction.block_root, None); if self .ready_work_tx .try_send(ReadyWork::ColumnReconstruction(column_reconstruction)) @@ -1398,7 +1400,10 @@ mod tests { queue.handle_message(InboundEvent::ReadyColumnReconstruction(reconstruction)); } - assert!(queue.queued_column_reconstructions.is_empty()); + assert_eq!( + queue.queued_column_reconstructions.get(&block_root), + Some(&None) + ); } /// Tests that column reconstruction queued after the deadline is triggered immediately