keyed by hash256

This commit is contained in:
Eitan Seri-Levi
2026-05-23 17:50:28 +03:00
parent 7f84ac18c7
commit 3febb7275c

View File

@@ -253,7 +253,7 @@ enum InboundEvent {
/// A column reconstruction that was queued is ready for processing.
ReadyColumnReconstruction(QueuedColumnReconstruction),
/// A gossip data column that timed out waiting for its block.
ReadyDataColumn(usize),
ReadyDataColumn(Hash256),
/// A message sent to the `ReprocessQueue`
Msg(ReprocessQueueMessage),
}
@@ -278,7 +278,8 @@ struct ReprocessQueue<S> {
lc_updates_delay_queue: DelayQueue<QueuedLightClientUpdateId>,
/// Queue to manage scheduled column reconstructions.
column_reconstructions_delay_queue: DelayQueue<QueuedColumnReconstruction>,
data_columns_delay_queue: DelayQueue<usize>,
/// Queue to manage gossip data column timeouts.
data_columns_delay_queue: DelayQueue<Hash256>,
/* Queued items */
/// Queued blocks.
@@ -299,15 +300,12 @@ struct ReprocessQueue<S> {
queued_column_reconstructions: HashMap<Hash256, Option<DelayKey>>,
/// Queued backfill batches
queued_backfill_batches: Vec<QueuedBackfillBatch>,
/// Queued gossip data columns awaiting their block.
queued_gossip_data_columns: FnvHashMap<usize, (QueuedGossipDataColumn, DelayKey)>,
/// Data columns per block root.
awaiting_data_columns_per_root: HashMap<Hash256, Vec<usize>>,
/// Queued gossip data columns awaiting their block, keyed by block root.
queued_gossip_data_columns: HashMap<Hash256, (Vec<QueuedGossipDataColumn>, DelayKey)>,
/* Aux */
/// Next attestation id, used for both aggregated and unaggregated attestations
next_attestation: usize,
next_data_column: usize,
next_lc_update: usize,
early_block_debounce: TimeLatch,
envelope_delay_debounce: TimeLatch,
@@ -408,8 +406,8 @@ impl<S: SlotClock> Stream for ReprocessQueue<S> {
}
match self.data_columns_delay_queue.poll_expired(cx) {
Poll::Ready(Some(col_id)) => {
return Poll::Ready(Some(InboundEvent::ReadyDataColumn(col_id.into_inner())));
Poll::Ready(Some(block_root)) => {
return Poll::Ready(Some(InboundEvent::ReadyDataColumn(block_root.into_inner())));
}
Poll::Ready(None) | Poll::Pending => (),
}
@@ -492,10 +490,8 @@ impl<S: SlotClock> ReprocessQueue<S> {
awaiting_lc_updates_per_parent_root: HashMap::new(),
queued_backfill_batches: Vec::new(),
queued_column_reconstructions: HashMap::new(),
queued_gossip_data_columns: FnvHashMap::default(),
awaiting_data_columns_per_root: HashMap::new(),
queued_gossip_data_columns: HashMap::new(),
next_attestation: 0,
next_data_column: 0,
next_lc_update: 0,
early_block_debounce: TimeLatch::default(),
envelope_delay_debounce: TimeLatch::default(),
@@ -720,27 +716,25 @@ impl<S: SlotClock> ReprocessQueue<S> {
self.next_attestation += 1;
}
InboundEvent::Msg(UnknownBlockDataColumn(queued_data_column)) => {
let block_root = queued_data_column.beacon_block_root;
if let Some((columns, _delay_key)) =
self.queued_gossip_data_columns.get_mut(&block_root)
{
// Append to existing entry; the timer for this root is already running.
columns.push(queued_data_column);
} else {
if self.queued_gossip_data_columns.len() >= MAXIMUM_QUEUED_DATA_COLUMNS {
return;
}
let col_id = self.next_data_column;
let delay_key = self
.data_columns_delay_queue
.insert(col_id, QUEUED_ATTESTATION_DELAY);
.insert(block_root, QUEUED_ATTESTATION_DELAY);
// Register this column for the corresponding block root.
self.awaiting_data_columns_per_root
.entry(queued_data_column.beacon_block_root)
.or_default()
.push(col_id);
// Store the column and its info.
self.queued_gossip_data_columns
.insert(col_id, (queued_data_column, delay_key));
self.next_data_column += 1;
.insert(block_root, (vec![queued_data_column], delay_key));
}
}
InboundEvent::Msg(UnknownLightClientOptimisticUpdate(
queued_light_client_optimistic_update,
@@ -856,12 +850,11 @@ impl<S: SlotClock> ReprocessQueue<S> {
}
// Unqueue the data columns we have for this root, if any.
if let Some(queued_ids) = self.awaiting_data_columns_per_root.remove(&block_root) {
for col_id in queued_ids {
if let Some((data_column, delay_key)) =
self.queued_gossip_data_columns.remove(&col_id)
if let Some((data_columns, delay_key)) =
self.queued_gossip_data_columns.remove(&block_root)
{
self.data_columns_delay_queue.remove(&delay_key);
for data_column in data_columns {
if self
.ready_work_tx
.try_send(ReadyWork::DataColumn(data_column))
@@ -872,7 +865,6 @@ impl<S: SlotClock> ReprocessQueue<S> {
}
}
}
}
InboundEvent::Msg(NewLightClientOptimisticUpdate { parent_root }) => {
// Unqueue the light client optimistic updates we have for this root, if any.
if let Some(queued_lc_id) = self
@@ -1125,19 +1117,10 @@ impl<S: SlotClock> ReprocessQueue<S> {
);
}
}
InboundEvent::ReadyDataColumn(col_id) => {
if let Some((data_column, _)) = self.queued_gossip_data_columns.remove(&col_id) {
// Clean up the per-root index.
let root = data_column.beacon_block_root;
if let Entry::Occupied(mut entry) =
self.awaiting_data_columns_per_root.entry(root)
InboundEvent::ReadyDataColumn(block_root) => {
if let Some((data_columns, _)) = self.queued_gossip_data_columns.remove(&block_root)
{
let ids = entry.get_mut();
ids.retain(|&id| id != col_id);
if ids.is_empty() {
entry.remove_entry();
}
}
for data_column in data_columns {
if self
.ready_work_tx
.try_send(ReadyWork::DataColumn(data_column))
@@ -1151,6 +1134,7 @@ impl<S: SlotClock> ReprocessQueue<S> {
}
}
}
}
metrics::set_gauge_vec(
&metrics::BEACON_PROCESSOR_REPROCESSING_QUEUE_TOTAL,