From 0e99b454a3add2215490fa6888a4f70400f3f99d Mon Sep 17 00:00:00 2001 From: Eitan Seri-Levi Date: Sun, 24 May 2026 11:09:51 +0300 Subject: [PATCH] Fixes --- .../src/scheduler/work_reprocessing_queue.rs | 35 +++++++++++++++---- 1 file changed, 29 insertions(+), 6 deletions(-) diff --git a/beacon_node/beacon_processor/src/scheduler/work_reprocessing_queue.rs b/beacon_node/beacon_processor/src/scheduler/work_reprocessing_queue.rs index d9b8687444..47d7e834a0 100644 --- a/beacon_node/beacon_processor/src/scheduler/work_reprocessing_queue.rs +++ b/beacon_node/beacon_processor/src/scheduler/work_reprocessing_queue.rs @@ -52,6 +52,9 @@ pub const QUEUED_ATTESTATION_DELAY: Duration = Duration::from_secs(12); /// For how long to queue light client updates for re-processing. pub const QUEUED_LIGHT_CLIENT_UPDATE_DELAY: Duration = Duration::from_secs(12); +/// For how long to queue gossip data columns awaiting their block for re-processing. +pub const QUEUED_GOSSIP_DATA_COLUMN_DELAY: Duration = Duration::from_secs(12); + /// Envelope timeout as a multiplier of slot duration. Envelopes waiting for their block will be /// sent for processing after this many slots worth of time, even if the block hasn't arrived. const QUEUED_ENVELOPE_DELAY_SLOTS: u32 = 1; @@ -302,6 +305,8 @@ struct ReprocessQueue { queued_backfill_batches: Vec, /// Queued gossip data columns awaiting their block, keyed by block root. awaiting_data_columns_per_root: HashMap, DelayKey)>, + /// Total number of queued gossip data columns across all roots. + queued_data_columns_count: usize, /* Aux */ /// Next attestation id, used for both aggregated and unaggregated attestations @@ -312,6 +317,7 @@ struct ReprocessQueue { rpc_block_debounce: TimeLatch, attestation_delay_debounce: TimeLatch, lc_update_delay_debounce: TimeLatch, + data_column_delay_debounce: TimeLatch, next_backfill_batch_event: Option>>, slot_clock: Arc, } @@ -491,6 +497,7 @@ impl ReprocessQueue { queued_backfill_batches: Vec::new(), queued_column_reconstructions: HashMap::new(), awaiting_data_columns_per_root: HashMap::new(), + queued_data_columns_count: 0, next_attestation: 0, next_lc_update: 0, early_block_debounce: TimeLatch::default(), @@ -498,6 +505,7 @@ impl ReprocessQueue { rpc_block_debounce: TimeLatch::default(), attestation_delay_debounce: TimeLatch::default(), lc_update_delay_debounce: TimeLatch::default(), + data_column_delay_debounce: TimeLatch::default(), next_backfill_batch_event: None, slot_clock, } @@ -718,23 +726,32 @@ impl ReprocessQueue { InboundEvent::Msg(UnknownBlockDataColumn(queued_data_column)) => { let block_root = queued_data_column.beacon_block_root; + if self.queued_data_columns_count >= MAXIMUM_QUEUED_DATA_COLUMNS { + if self.data_column_delay_debounce.elapsed() { + warn!( + queue_size = MAXIMUM_QUEUED_DATA_COLUMNS, + msg = "system resources may be saturated", + "Data column delay queue is full, dropping column" + ); + } + return; + } + if let Some((columns, _delay_key)) = self.awaiting_data_columns_per_root.get_mut(&block_root) { // Append to existing entry; the timer for this root is already running. columns.push(queued_data_column); } else { - if self.awaiting_data_columns_per_root.len() >= MAXIMUM_QUEUED_DATA_COLUMNS { - return; - } - let delay_key = self .data_columns_delay_queue - .insert(block_root, QUEUED_ATTESTATION_DELAY); + .insert(block_root, QUEUED_GOSSIP_DATA_COLUMN_DELAY); self.awaiting_data_columns_per_root .insert(block_root, (vec![queued_data_column], delay_key)); } + + self.queued_data_columns_count += 1; } InboundEvent::Msg(UnknownLightClientOptimisticUpdate( queued_light_client_optimistic_update, @@ -854,6 +871,9 @@ impl ReprocessQueue { self.awaiting_data_columns_per_root.remove(&block_root) { self.data_columns_delay_queue.remove(&delay_key); + self.queued_data_columns_count = self + .queued_data_columns_count + .saturating_sub(data_columns.len()); for data_column in data_columns { if self .ready_work_tx @@ -1121,6 +1141,9 @@ impl ReprocessQueue { if let Some((data_columns, _)) = self.awaiting_data_columns_per_root.remove(&block_root) { + self.queued_data_columns_count = self + .queued_data_columns_count + .saturating_sub(data_columns.len()); for data_column in data_columns { if self .ready_work_tx @@ -1779,7 +1802,7 @@ mod tests { ); // Advance time past the delay so the entry expires. - advance_time(&queue.slot_clock, 2 * QUEUED_ATTESTATION_DELAY).await; + advance_time(&queue.slot_clock, 2 * QUEUED_GOSSIP_DATA_COLUMN_DELAY).await; let ready_msg = queue.next().await.unwrap(); assert!(matches!(ready_msg, InboundEvent::ReadyDataColumn(_))); queue.handle_message(ready_msg);