fix: prevent duplicate column reconstruction dispatch (#9250)

Fixes a flaky CI failure in `data_column_reconstruction_at_deadline` where 2 `column_reconstruction` events are emitted instead of the expected 1.


  - Change `queued_column_reconstructions` from `HashMap<Hash256, DelayKey>` to `HashMap<Hash256, Option<DelayKey>>`, where `None` indicates reconstruction was already dispatched.
- On dispatch (`ReadyColumnReconstruction`), set the entry to `None` instead of removing it. This prevents a subsequent gossip column from inserting a fresh reconstruction request into the now-vacant slot.
- Prune stale `None` entries on each dispatch to keep the map bounded.


Co-Authored-By: Josh King <josh@sigmaprime.io>
This commit is contained in:
jking-aus
2026-05-01 14:44:25 +02:00
committed by GitHub
parent 8b8124d4a4
commit 330348ea14

View File

@@ -280,8 +280,8 @@ struct ReprocessQueue<S> {
queued_lc_updates: FnvHashMap<usize, (QueuedLightClientUpdate, DelayKey)>,
/// Light Client Updates per parent_root.
awaiting_lc_updates_per_parent_root: HashMap<Hash256, Vec<QueuedLightClientUpdateId>>,
/// Column reconstruction per block root.
queued_column_reconstructions: HashMap<Hash256, DelayKey>,
/// Column reconstruction per block root. `None` means reconstruction was already dispatched.
queued_column_reconstructions: HashMap<Hash256, Option<DelayKey>>,
/// Queued backfill batches
queued_backfill_batches: Vec<QueuedBackfillBatch>,
@@ -865,20 +865,20 @@ impl<S: SlotClock> ReprocessQueue<S> {
&& duration_from_current_slot >= reconstruction_deadline
&& current_slot == request.slot
{
// If we are at least `reconstruction_deadline` seconds into the current slot,
// and the reconstruction request is for the current slot, process reconstruction immediately.
reconstruction_delay = Duration::from_secs(0);
}
match self.queued_column_reconstructions.entry(request.block_root) {
Entry::Occupied(key) => {
Entry::Occupied(entry) => {
if let Some(delay_key) = entry.get() {
self.column_reconstructions_delay_queue
.reset(key.get(), reconstruction_delay);
.reset(delay_key, reconstruction_delay);
}
}
Entry::Vacant(vacant) => {
let delay_key = self
.column_reconstructions_delay_queue
.insert(request, reconstruction_delay);
vacant.insert(delay_key);
vacant.insert(Some(delay_key));
}
}
}
@@ -1039,7 +1039,9 @@ impl<S: SlotClock> ReprocessQueue<S> {
}
InboundEvent::ReadyColumnReconstruction(column_reconstruction) => {
self.queued_column_reconstructions
.remove(&column_reconstruction.block_root);
.retain(|_, v| v.is_some());
self.queued_column_reconstructions
.insert(column_reconstruction.block_root, None);
if self
.ready_work_tx
.try_send(ReadyWork::ColumnReconstruction(column_reconstruction))
@@ -1398,7 +1400,10 @@ mod tests {
queue.handle_message(InboundEvent::ReadyColumnReconstruction(reconstruction));
}
assert!(queue.queued_column_reconstructions.is_empty());
assert_eq!(
queue.queued_column_reconstructions.get(&block_root),
Some(&None)
);
}
/// Tests that column reconstruction queued after the deadline is triggered immediately