From 4ad036a28176d448292b694fd4a29d93b9cfad98 Mon Sep 17 00:00:00 2001 From: Jimmy Chen Date: Mon, 2 Mar 2026 16:52:40 +1100 Subject: [PATCH] Fix duplicate column reconstruction race in reprocessing queue --- .../src/scheduler/work_reprocessing_queue.rs | 21 +++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/beacon_node/beacon_processor/src/scheduler/work_reprocessing_queue.rs b/beacon_node/beacon_processor/src/scheduler/work_reprocessing_queue.rs index c99388287c..d7f5f70000 100644 --- a/beacon_node/beacon_processor/src/scheduler/work_reprocessing_queue.rs +++ b/beacon_node/beacon_processor/src/scheduler/work_reprocessing_queue.rs @@ -257,7 +257,9 @@ struct ReprocessQueue { /// Light Client Updates per parent_root. awaiting_lc_updates_per_parent_root: HashMap>, /// Column reconstruction per block root. - queued_column_reconstructions: HashMap, + /// `Some(key)` = active timer in the delay queue. + /// `None` = reconstruction already fired, reject late messages. + queued_column_reconstructions: HashMap>, /// Queued backfill batches queued_backfill_batches: Vec, @@ -772,15 +774,18 @@ impl ReprocessQueue { reconstruction_delay = Duration::from_secs(0); } match self.queued_column_reconstructions.entry(request.block_root) { - Entry::Occupied(key) => { - self.column_reconstructions_delay_queue - .reset(key.get(), reconstruction_delay); + Entry::Occupied(entry) => { + if let Some(delay_key) = entry.get() { + self.column_reconstructions_delay_queue + .reset(delay_key, reconstruction_delay); + } + // None → reconstruction already fired, skip } Entry::Vacant(vacant) => { let delay_key = self .column_reconstructions_delay_queue .insert(request, reconstruction_delay); - vacant.insert(delay_key); + vacant.insert(Some(delay_key)); } } } @@ -921,8 +926,12 @@ impl ReprocessQueue { } } InboundEvent::ReadyColumnReconstruction(column_reconstruction) => { + let block_root = column_reconstruction.block_root; + // Prune old fired entries before marking current self.queued_column_reconstructions - .remove(&column_reconstruction.block_root); + .retain(|_, v| v.is_some()); + // Mark as fired (prevents duplicate reconstruction from late messages) + self.queued_column_reconstructions.insert(block_root, None); if self .ready_work_tx .try_send(ReadyWork::ColumnReconstruction(column_reconstruction))