Fix reprocess queue memory leak (#8065)

Fix a memory leak in the reprocess queue.


  If the vec of attestation IDs for a block is never evicted from the reprocess queue by a `BlockImported` event, then it stays in the map forever consuming memory. The fix is to remove the entry when its last attestation times out. We do similarly for light client updates.

In practice this will only occur if there is a race between adding an attestation to the queue and processing the `BlockImported` event, or if there are attestations for block roots that we never import (e.g. random block roots, block roots of invalid blocks).


Co-Authored-By: Michael Sproul <michael@sigmaprime.io>
This commit is contained in:
Michael Sproul
2025-09-18 15:16:59 +10:00
committed by GitHub
parent 521be2b757
commit 684632df73

View File

@@ -37,7 +37,9 @@ const TASK_NAME: &str = "beacon_processor_reprocess_queue";
const GOSSIP_BLOCKS: &str = "gossip_blocks";
const RPC_BLOCKS: &str = "rpc_blocks";
const ATTESTATIONS: &str = "attestations";
const ATTESTATIONS_PER_ROOT: &str = "attestations_per_root";
const LIGHT_CLIENT_UPDATES: &str = "lc_updates";
const LIGHT_CLIENT_UPDATES_PER_PARENT_ROOT: &str = "lc_updates_per_parent_root";
/// Queue blocks for re-processing with an `ADDITIONAL_QUEUED_BLOCK_DELAY` after the slot starts.
/// This is to account for any slight drift in the system clock.
@@ -829,10 +831,19 @@ impl<S: SlotClock> ReprocessQueue<S> {
);
}
if let Some(queued_atts) = self.awaiting_attestations_per_root.get_mut(&root)
&& let Some(index) = queued_atts.iter().position(|&id| id == queued_id)
if let Entry::Occupied(mut queued_atts) =
self.awaiting_attestations_per_root.entry(root)
&& let Some(index) =
queued_atts.get().iter().position(|&id| id == queued_id)
{
queued_atts.swap_remove(index);
let queued_atts_mut = queued_atts.get_mut();
queued_atts_mut.swap_remove(index);
// If the vec is empty after this attestation's removal, we need to delete
// the entry to prevent bloating the hashmap indefinitely.
if queued_atts_mut.is_empty() {
queued_atts.remove_entry();
}
}
}
}
@@ -853,13 +864,19 @@ impl<S: SlotClock> ReprocessQueue<S> {
error!("Failed to send scheduled light client optimistic update");
}
if let Some(queued_lc_updates) = self
.awaiting_lc_updates_per_parent_root
.get_mut(&parent_root)
&& let Some(index) =
queued_lc_updates.iter().position(|&id| id == queued_id)
if let Entry::Occupied(mut queued_lc_updates) =
self.awaiting_lc_updates_per_parent_root.entry(parent_root)
&& let Some(index) = queued_lc_updates
.get()
.iter()
.position(|&id| id == queued_id)
{
queued_lc_updates.swap_remove(index);
let queued_lc_updates_mut = queued_lc_updates.get_mut();
queued_lc_updates_mut.swap_remove(index);
if queued_lc_updates_mut.is_empty() {
queued_lc_updates.remove_entry();
}
}
}
}
@@ -929,11 +946,21 @@ impl<S: SlotClock> ReprocessQueue<S> {
&[ATTESTATIONS],
self.attestations_delay_queue.len() as i64,
);
metrics::set_gauge_vec(
&metrics::BEACON_PROCESSOR_REPROCESSING_QUEUE_TOTAL,
&[ATTESTATIONS_PER_ROOT],
self.awaiting_attestations_per_root.len() as i64,
);
metrics::set_gauge_vec(
&metrics::BEACON_PROCESSOR_REPROCESSING_QUEUE_TOTAL,
&[LIGHT_CLIENT_UPDATES],
self.lc_updates_delay_queue.len() as i64,
);
metrics::set_gauge_vec(
&metrics::BEACON_PROCESSOR_REPROCESSING_QUEUE_TOTAL,
&[LIGHT_CLIENT_UPDATES_PER_PARENT_ROOT],
self.awaiting_lc_updates_per_parent_root.len() as i64,
);
}
fn recompute_next_backfill_batch_event(&mut self) {
@@ -979,6 +1006,7 @@ impl<S: SlotClock> ReprocessQueue<S> {
#[cfg(test)]
mod tests {
use super::*;
use crate::BeaconProcessorConfig;
use logging::create_test_tracing_subscriber;
use slot_clock::{ManualSlotClock, TestingSlotClock};
use std::ops::Add;
@@ -1101,4 +1129,97 @@ mod tests {
Duration::from_secs(slot_duration),
)
}
fn test_queue() -> ReprocessQueue<ManualSlotClock> {
create_test_tracing_subscriber();
let config = BeaconProcessorConfig::default();
let (ready_work_tx, _) = mpsc::channel::<ReadyWork>(config.max_scheduled_work_queue_len);
let (_, reprocess_work_rx) =
mpsc::channel::<ReprocessQueueMessage>(config.max_scheduled_work_queue_len);
let slot_clock = Arc::new(testing_slot_clock(12));
ReprocessQueue::new(ready_work_tx, reprocess_work_rx, slot_clock)
}
// This is a regression test for a memory leak in `awaiting_attestations_per_root`.
// See: https://github.com/sigp/lighthouse/pull/8065
#[tokio::test]
async fn prune_awaiting_attestations_per_root() {
create_test_tracing_subscriber();
let mut queue = test_queue();
// Pause time so it only advances manually
tokio::time::pause();
let beacon_block_root = Hash256::repeat_byte(0xaf);
// Insert an attestation.
let att = ReprocessQueueMessage::UnknownBlockUnaggregate(QueuedUnaggregate {
beacon_block_root,
process_fn: Box::new(|| {}),
});
// Process the event to enter it into the delay queue.
queue.handle_message(InboundEvent::Msg(att));
// Check that it is queued.
assert_eq!(queue.awaiting_attestations_per_root.len(), 1);
assert!(
queue
.awaiting_attestations_per_root
.contains_key(&beacon_block_root)
);
// Advance time to expire the attestation.
advance_time(&queue.slot_clock, 2 * QUEUED_ATTESTATION_DELAY).await;
let ready_msg = queue.next().await.unwrap();
assert!(matches!(ready_msg, InboundEvent::ReadyAttestation(_)));
queue.handle_message(ready_msg);
// The entry for the block root should be gone.
assert!(queue.awaiting_attestations_per_root.is_empty());
}
// This is a regression test for a memory leak in `awaiting_lc_updates_per_parent_root`.
// See: https://github.com/sigp/lighthouse/pull/8065
#[tokio::test]
async fn prune_awaiting_lc_updates_per_parent_root() {
create_test_tracing_subscriber();
let mut queue = test_queue();
// Pause time so it only advances manually
tokio::time::pause();
let parent_root = Hash256::repeat_byte(0xaf);
// Insert an attestation.
let msg =
ReprocessQueueMessage::UnknownLightClientOptimisticUpdate(QueuedLightClientUpdate {
parent_root,
process_fn: Box::new(|| {}),
});
// Process the event to enter it into the delay queue.
queue.handle_message(InboundEvent::Msg(msg));
// Check that it is queued.
assert_eq!(queue.awaiting_lc_updates_per_parent_root.len(), 1);
assert!(
queue
.awaiting_lc_updates_per_parent_root
.contains_key(&parent_root)
);
// Advance time to expire the update.
advance_time(&queue.slot_clock, 2 * QUEUED_LIGHT_CLIENT_UPDATE_DELAY).await;
let ready_msg = queue.next().await.unwrap();
assert!(matches!(ready_msg, InboundEvent::ReadyLightClientUpdate(_)));
queue.handle_message(ready_msg);
// The entry for the block root should be gone.
assert!(queue.awaiting_lc_updates_per_parent_root.is_empty());
}
}