mirror of
https://github.com/sigp/lighthouse.git
synced 2026-05-08 01:05:47 +00:00
Fix duplicate column reconstruction race in reprocessing queue
This commit is contained in:
@@ -257,7 +257,9 @@ struct ReprocessQueue<S> {
|
|||||||
/// Light Client Updates per parent_root.
|
/// Light Client Updates per parent_root.
|
||||||
awaiting_lc_updates_per_parent_root: HashMap<Hash256, Vec<QueuedLightClientUpdateId>>,
|
awaiting_lc_updates_per_parent_root: HashMap<Hash256, Vec<QueuedLightClientUpdateId>>,
|
||||||
/// Column reconstruction per block root.
|
/// Column reconstruction per block root.
|
||||||
queued_column_reconstructions: HashMap<Hash256, DelayKey>,
|
/// `Some(key)` = active timer in the delay queue.
|
||||||
|
/// `None` = reconstruction already fired, reject late messages.
|
||||||
|
queued_column_reconstructions: HashMap<Hash256, Option<DelayKey>>,
|
||||||
/// Queued backfill batches
|
/// Queued backfill batches
|
||||||
queued_backfill_batches: Vec<QueuedBackfillBatch>,
|
queued_backfill_batches: Vec<QueuedBackfillBatch>,
|
||||||
|
|
||||||
@@ -772,15 +774,18 @@ impl<S: SlotClock> ReprocessQueue<S> {
|
|||||||
reconstruction_delay = Duration::from_secs(0);
|
reconstruction_delay = Duration::from_secs(0);
|
||||||
}
|
}
|
||||||
match self.queued_column_reconstructions.entry(request.block_root) {
|
match self.queued_column_reconstructions.entry(request.block_root) {
|
||||||
Entry::Occupied(key) => {
|
Entry::Occupied(entry) => {
|
||||||
|
if let Some(delay_key) = entry.get() {
|
||||||
self.column_reconstructions_delay_queue
|
self.column_reconstructions_delay_queue
|
||||||
.reset(key.get(), reconstruction_delay);
|
.reset(delay_key, reconstruction_delay);
|
||||||
|
}
|
||||||
|
// None → reconstruction already fired, skip
|
||||||
}
|
}
|
||||||
Entry::Vacant(vacant) => {
|
Entry::Vacant(vacant) => {
|
||||||
let delay_key = self
|
let delay_key = self
|
||||||
.column_reconstructions_delay_queue
|
.column_reconstructions_delay_queue
|
||||||
.insert(request, reconstruction_delay);
|
.insert(request, reconstruction_delay);
|
||||||
vacant.insert(delay_key);
|
vacant.insert(Some(delay_key));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -921,8 +926,12 @@ impl<S: SlotClock> ReprocessQueue<S> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
InboundEvent::ReadyColumnReconstruction(column_reconstruction) => {
|
InboundEvent::ReadyColumnReconstruction(column_reconstruction) => {
|
||||||
|
let block_root = column_reconstruction.block_root;
|
||||||
|
// Prune old fired entries before marking current
|
||||||
self.queued_column_reconstructions
|
self.queued_column_reconstructions
|
||||||
.remove(&column_reconstruction.block_root);
|
.retain(|_, v| v.is_some());
|
||||||
|
// Mark as fired (prevents duplicate reconstruction from late messages)
|
||||||
|
self.queued_column_reconstructions.insert(block_root, None);
|
||||||
if self
|
if self
|
||||||
.ready_work_tx
|
.ready_work_tx
|
||||||
.try_send(ReadyWork::ColumnReconstruction(column_reconstruction))
|
.try_send(ReadyWork::ColumnReconstruction(column_reconstruction))
|
||||||
|
|||||||
Reference in New Issue
Block a user