From 684632df731a69d6e42531bc1c323557a7b45d7e Mon Sep 17 00:00:00 2001 From: Michael Sproul Date: Thu, 18 Sep 2025 15:16:59 +1000 Subject: [PATCH] Fix reprocess queue memory leak (#8065) Fix a memory leak in the reprocess queue. If the vec of attestation IDs for a block is never evicted from the reprocess queue by a `BlockImported` event, then it stays in the map forever consuming memory. The fix is to remove the entry when its last attestation times out. We do similarly for light client updates. In practice this will only occur if there is a race between adding an attestation to the queue and processing the `BlockImported` event, or if there are attestations for block roots that we never import (e.g. random block roots, block roots of invalid blocks). Co-Authored-By: Michael Sproul --- .../src/scheduler/work_reprocessing_queue.rs | 139 ++++++++++++++++-- 1 file changed, 130 insertions(+), 9 deletions(-) diff --git a/beacon_node/beacon_processor/src/scheduler/work_reprocessing_queue.rs b/beacon_node/beacon_processor/src/scheduler/work_reprocessing_queue.rs index 9565e57589..3e755f0830 100644 --- a/beacon_node/beacon_processor/src/scheduler/work_reprocessing_queue.rs +++ b/beacon_node/beacon_processor/src/scheduler/work_reprocessing_queue.rs @@ -37,7 +37,9 @@ const TASK_NAME: &str = "beacon_processor_reprocess_queue"; const GOSSIP_BLOCKS: &str = "gossip_blocks"; const RPC_BLOCKS: &str = "rpc_blocks"; const ATTESTATIONS: &str = "attestations"; +const ATTESTATIONS_PER_ROOT: &str = "attestations_per_root"; const LIGHT_CLIENT_UPDATES: &str = "lc_updates"; +const LIGHT_CLIENT_UPDATES_PER_PARENT_ROOT: &str = "lc_updates_per_parent_root"; /// Queue blocks for re-processing with an `ADDITIONAL_QUEUED_BLOCK_DELAY` after the slot starts. /// This is to account for any slight drift in the system clock. @@ -829,10 +831,19 @@ impl ReprocessQueue { ); } - if let Some(queued_atts) = self.awaiting_attestations_per_root.get_mut(&root) - && let Some(index) = queued_atts.iter().position(|&id| id == queued_id) + if let Entry::Occupied(mut queued_atts) = + self.awaiting_attestations_per_root.entry(root) + && let Some(index) = + queued_atts.get().iter().position(|&id| id == queued_id) { - queued_atts.swap_remove(index); + let queued_atts_mut = queued_atts.get_mut(); + queued_atts_mut.swap_remove(index); + + // If the vec is empty after this attestation's removal, we need to delete + // the entry to prevent bloating the hashmap indefinitely. + if queued_atts_mut.is_empty() { + queued_atts.remove_entry(); + } } } } @@ -853,13 +864,19 @@ impl ReprocessQueue { error!("Failed to send scheduled light client optimistic update"); } - if let Some(queued_lc_updates) = self - .awaiting_lc_updates_per_parent_root - .get_mut(&parent_root) - && let Some(index) = - queued_lc_updates.iter().position(|&id| id == queued_id) + if let Entry::Occupied(mut queued_lc_updates) = + self.awaiting_lc_updates_per_parent_root.entry(parent_root) + && let Some(index) = queued_lc_updates + .get() + .iter() + .position(|&id| id == queued_id) { - queued_lc_updates.swap_remove(index); + let queued_lc_updates_mut = queued_lc_updates.get_mut(); + queued_lc_updates_mut.swap_remove(index); + + if queued_lc_updates_mut.is_empty() { + queued_lc_updates.remove_entry(); + } } } } @@ -929,11 +946,21 @@ impl ReprocessQueue { &[ATTESTATIONS], self.attestations_delay_queue.len() as i64, ); + metrics::set_gauge_vec( + &metrics::BEACON_PROCESSOR_REPROCESSING_QUEUE_TOTAL, + &[ATTESTATIONS_PER_ROOT], + self.awaiting_attestations_per_root.len() as i64, + ); metrics::set_gauge_vec( &metrics::BEACON_PROCESSOR_REPROCESSING_QUEUE_TOTAL, &[LIGHT_CLIENT_UPDATES], self.lc_updates_delay_queue.len() as i64, ); + metrics::set_gauge_vec( + &metrics::BEACON_PROCESSOR_REPROCESSING_QUEUE_TOTAL, + &[LIGHT_CLIENT_UPDATES_PER_PARENT_ROOT], + self.awaiting_lc_updates_per_parent_root.len() as i64, + ); } fn recompute_next_backfill_batch_event(&mut self) { @@ -979,6 +1006,7 @@ impl ReprocessQueue { #[cfg(test)] mod tests { use super::*; + use crate::BeaconProcessorConfig; use logging::create_test_tracing_subscriber; use slot_clock::{ManualSlotClock, TestingSlotClock}; use std::ops::Add; @@ -1101,4 +1129,97 @@ mod tests { Duration::from_secs(slot_duration), ) } + + fn test_queue() -> ReprocessQueue { + create_test_tracing_subscriber(); + + let config = BeaconProcessorConfig::default(); + let (ready_work_tx, _) = mpsc::channel::(config.max_scheduled_work_queue_len); + let (_, reprocess_work_rx) = + mpsc::channel::(config.max_scheduled_work_queue_len); + let slot_clock = Arc::new(testing_slot_clock(12)); + + ReprocessQueue::new(ready_work_tx, reprocess_work_rx, slot_clock) + } + + // This is a regression test for a memory leak in `awaiting_attestations_per_root`. + // See: https://github.com/sigp/lighthouse/pull/8065 + #[tokio::test] + async fn prune_awaiting_attestations_per_root() { + create_test_tracing_subscriber(); + + let mut queue = test_queue(); + + // Pause time so it only advances manually + tokio::time::pause(); + + let beacon_block_root = Hash256::repeat_byte(0xaf); + + // Insert an attestation. + let att = ReprocessQueueMessage::UnknownBlockUnaggregate(QueuedUnaggregate { + beacon_block_root, + process_fn: Box::new(|| {}), + }); + + // Process the event to enter it into the delay queue. + queue.handle_message(InboundEvent::Msg(att)); + + // Check that it is queued. + assert_eq!(queue.awaiting_attestations_per_root.len(), 1); + assert!( + queue + .awaiting_attestations_per_root + .contains_key(&beacon_block_root) + ); + + // Advance time to expire the attestation. + advance_time(&queue.slot_clock, 2 * QUEUED_ATTESTATION_DELAY).await; + let ready_msg = queue.next().await.unwrap(); + assert!(matches!(ready_msg, InboundEvent::ReadyAttestation(_))); + queue.handle_message(ready_msg); + + // The entry for the block root should be gone. + assert!(queue.awaiting_attestations_per_root.is_empty()); + } + + // This is a regression test for a memory leak in `awaiting_lc_updates_per_parent_root`. + // See: https://github.com/sigp/lighthouse/pull/8065 + #[tokio::test] + async fn prune_awaiting_lc_updates_per_parent_root() { + create_test_tracing_subscriber(); + + let mut queue = test_queue(); + + // Pause time so it only advances manually + tokio::time::pause(); + + let parent_root = Hash256::repeat_byte(0xaf); + + // Insert an attestation. + let msg = + ReprocessQueueMessage::UnknownLightClientOptimisticUpdate(QueuedLightClientUpdate { + parent_root, + process_fn: Box::new(|| {}), + }); + + // Process the event to enter it into the delay queue. + queue.handle_message(InboundEvent::Msg(msg)); + + // Check that it is queued. + assert_eq!(queue.awaiting_lc_updates_per_parent_root.len(), 1); + assert!( + queue + .awaiting_lc_updates_per_parent_root + .contains_key(&parent_root) + ); + + // Advance time to expire the update. + advance_time(&queue.slot_clock, 2 * QUEUED_LIGHT_CLIENT_UPDATE_DELAY).await; + let ready_msg = queue.next().await.unwrap(); + assert!(matches!(ready_msg, InboundEvent::ReadyLightClientUpdate(_))); + queue.handle_message(ready_msg); + + // The entry for the block root should be gone. + assert!(queue.awaiting_lc_updates_per_parent_root.is_empty()); + } }