diff --git a/beacon_node/beacon_processor/src/scheduler/work_reprocessing_queue.rs b/beacon_node/beacon_processor/src/scheduler/work_reprocessing_queue.rs index ab2acd16a1..764b09745a 100644 --- a/beacon_node/beacon_processor/src/scheduler/work_reprocessing_queue.rs +++ b/beacon_node/beacon_processor/src/scheduler/work_reprocessing_queue.rs @@ -253,7 +253,7 @@ enum InboundEvent { /// A column reconstruction that was queued is ready for processing. ReadyColumnReconstruction(QueuedColumnReconstruction), /// A gossip data column that timed out waiting for its block. - ReadyDataColumn(usize), + ReadyDataColumn(Hash256), /// A message sent to the `ReprocessQueue` Msg(ReprocessQueueMessage), } @@ -278,7 +278,8 @@ struct ReprocessQueue { lc_updates_delay_queue: DelayQueue, /// Queue to manage scheduled column reconstructions. column_reconstructions_delay_queue: DelayQueue, - data_columns_delay_queue: DelayQueue, + /// Queue to manage gossip data column timeouts. + data_columns_delay_queue: DelayQueue, /* Queued items */ /// Queued blocks. @@ -299,15 +300,12 @@ struct ReprocessQueue { queued_column_reconstructions: HashMap>, /// Queued backfill batches queued_backfill_batches: Vec, - /// Queued gossip data columns awaiting their block. - queued_gossip_data_columns: FnvHashMap, - /// Data columns per block root. - awaiting_data_columns_per_root: HashMap>, + /// Queued gossip data columns awaiting their block, keyed by block root. + queued_gossip_data_columns: HashMap, DelayKey)>, /* Aux */ /// Next attestation id, used for both aggregated and unaggregated attestations next_attestation: usize, - next_data_column: usize, next_lc_update: usize, early_block_debounce: TimeLatch, envelope_delay_debounce: TimeLatch, @@ -408,8 +406,8 @@ impl Stream for ReprocessQueue { } match self.data_columns_delay_queue.poll_expired(cx) { - Poll::Ready(Some(col_id)) => { - return Poll::Ready(Some(InboundEvent::ReadyDataColumn(col_id.into_inner()))); + Poll::Ready(Some(block_root)) => { + return Poll::Ready(Some(InboundEvent::ReadyDataColumn(block_root.into_inner()))); } Poll::Ready(None) | Poll::Pending => (), } @@ -492,10 +490,8 @@ impl ReprocessQueue { awaiting_lc_updates_per_parent_root: HashMap::new(), queued_backfill_batches: Vec::new(), queued_column_reconstructions: HashMap::new(), - queued_gossip_data_columns: FnvHashMap::default(), - awaiting_data_columns_per_root: HashMap::new(), + queued_gossip_data_columns: HashMap::new(), next_attestation: 0, - next_data_column: 0, next_lc_update: 0, early_block_debounce: TimeLatch::default(), envelope_delay_debounce: TimeLatch::default(), @@ -720,27 +716,25 @@ impl ReprocessQueue { self.next_attestation += 1; } InboundEvent::Msg(UnknownBlockDataColumn(queued_data_column)) => { - if self.queued_gossip_data_columns.len() >= MAXIMUM_QUEUED_DATA_COLUMNS { - return; + let block_root = queued_data_column.beacon_block_root; + + if let Some((columns, _delay_key)) = + self.queued_gossip_data_columns.get_mut(&block_root) + { + // Append to existing entry; the timer for this root is already running. + columns.push(queued_data_column); + } else { + if self.queued_gossip_data_columns.len() >= MAXIMUM_QUEUED_DATA_COLUMNS { + return; + } + + let delay_key = self + .data_columns_delay_queue + .insert(block_root, QUEUED_ATTESTATION_DELAY); + + self.queued_gossip_data_columns + .insert(block_root, (vec![queued_data_column], delay_key)); } - - let col_id = self.next_data_column; - - let delay_key = self - .data_columns_delay_queue - .insert(col_id, QUEUED_ATTESTATION_DELAY); - - // Register this column for the corresponding block root. - self.awaiting_data_columns_per_root - .entry(queued_data_column.beacon_block_root) - .or_default() - .push(col_id); - - // Store the column and its info. - self.queued_gossip_data_columns - .insert(col_id, (queued_data_column, delay_key)); - - self.next_data_column += 1; } InboundEvent::Msg(UnknownLightClientOptimisticUpdate( queued_light_client_optimistic_update, @@ -856,19 +850,17 @@ impl ReprocessQueue { } // Unqueue the data columns we have for this root, if any. - if let Some(queued_ids) = self.awaiting_data_columns_per_root.remove(&block_root) { - for col_id in queued_ids { - if let Some((data_column, delay_key)) = - self.queued_gossip_data_columns.remove(&col_id) + if let Some((data_columns, delay_key)) = + self.queued_gossip_data_columns.remove(&block_root) + { + self.data_columns_delay_queue.remove(&delay_key); + for data_column in data_columns { + if self + .ready_work_tx + .try_send(ReadyWork::DataColumn(data_column)) + .is_err() { - self.data_columns_delay_queue.remove(&delay_key); - if self - .ready_work_tx - .try_send(ReadyWork::DataColumn(data_column)) - .is_err() - { - error!(?block_root, "Failed to send data column for reprocessing"); - } + error!(?block_root, "Failed to send data column for reprocessing"); } } } @@ -1125,29 +1117,21 @@ impl ReprocessQueue { ); } } - InboundEvent::ReadyDataColumn(col_id) => { - if let Some((data_column, _)) = self.queued_gossip_data_columns.remove(&col_id) { - // Clean up the per-root index. - let root = data_column.beacon_block_root; - if let Entry::Occupied(mut entry) = - self.awaiting_data_columns_per_root.entry(root) - { - let ids = entry.get_mut(); - ids.retain(|&id| id != col_id); - if ids.is_empty() { - entry.remove_entry(); + InboundEvent::ReadyDataColumn(block_root) => { + if let Some((data_columns, _)) = self.queued_gossip_data_columns.remove(&block_root) + { + for data_column in data_columns { + if self + .ready_work_tx + .try_send(ReadyWork::DataColumn(data_column)) + .is_err() + { + error!( + hint = "system may be overloaded", + "Ignored expired gossip data column" + ); } } - if self - .ready_work_tx - .try_send(ReadyWork::DataColumn(data_column)) - .is_err() - { - error!( - hint = "system may be overloaded", - "Ignored expired gossip data column" - ); - } } } }