Mallory - Single commit

This commit is contained in:
Age Manning
2025-03-25 16:53:10 +11:00
parent b8178515cd
commit e2acce9468
118 changed files with 4753 additions and 3938 deletions

View File

@@ -59,7 +59,7 @@ use std::sync::Arc;
use std::task::Context;
use std::time::{Duration, Instant};
use strum::IntoStaticStr;
use task_executor::TaskExecutor;
use task_executor::{RayonPoolType, TaskExecutor};
use tokio::sync::mpsc;
use tokio::sync::mpsc::error::TrySendError;
use tracing::{debug, error, trace, warn};
@@ -181,7 +181,7 @@ impl BeaconProcessorQueueLengths {
// We don't request more than `PARENT_DEPTH_TOLERANCE` (32) lookups, so we can limit
// this queue size. With 48 max blobs per block, each column sidecar list could be up to 12MB.
rpc_custody_column_queue: 64,
column_reconstruction_queue: 64,
column_reconstruction_queue: 1,
chain_segment_queue: 64,
backfill_chain_segment: 64,
gossip_block_queue: 1024,
@@ -603,7 +603,7 @@ pub enum Work<E: EthSpec> {
process_fn: BlockingFn,
},
ChainSegment(AsyncFn),
ChainSegmentBackfill(AsyncFn),
ChainSegmentBackfill(BlockingFn),
Status(BlockingFn),
BlocksByRangeRequest(AsyncFn),
BlocksByRootsRequest(AsyncFn),
@@ -867,7 +867,7 @@ impl<E: EthSpec> BeaconProcessor<E> {
let mut rpc_blob_queue = FifoQueue::new(queue_lengths.rpc_blob_queue);
let mut rpc_custody_column_queue = FifoQueue::new(queue_lengths.rpc_custody_column_queue);
let mut column_reconstruction_queue =
FifoQueue::new(queue_lengths.column_reconstruction_queue);
LifoQueue::new(queue_lengths.column_reconstruction_queue);
let mut chain_segment_queue = FifoQueue::new(queue_lengths.chain_segment_queue);
let mut backfill_chain_segment = FifoQueue::new(queue_lengths.backfill_chain_segment);
let mut gossip_block_queue = FifoQueue::new(queue_lengths.gossip_block_queue);
@@ -1354,9 +1354,7 @@ impl<E: EthSpec> BeaconProcessor<E> {
Work::RpcCustodyColumn { .. } => {
rpc_custody_column_queue.push(work, work_id)
}
Work::ColumnReconstruction(_) => {
column_reconstruction_queue.push(work, work_id)
}
Work::ColumnReconstruction(_) => column_reconstruction_queue.push(work),
Work::ChainSegment { .. } => chain_segment_queue.push(work, work_id),
Work::ChainSegmentBackfill { .. } => {
backfill_chain_segment.push(work, work_id)
@@ -1605,7 +1603,14 @@ impl<E: EthSpec> BeaconProcessor<E> {
Work::BlocksByRangeRequest(work) | Work::BlocksByRootsRequest(work) => {
task_spawner.spawn_async(work)
}
Work::ChainSegmentBackfill(process_fn) => task_spawner.spawn_async(process_fn),
Work::ChainSegmentBackfill(process_fn) => {
if self.config.enable_backfill_rate_limiting {
task_spawner.spawn_blocking_with_rayon(RayonPoolType::LowPriority, process_fn)
} else {
// use the global rayon thread pool if backfill rate limiting is disabled.
task_spawner.spawn_blocking(process_fn)
}
}
Work::ApiRequestP0(process_fn) | Work::ApiRequestP1(process_fn) => match process_fn {
BlockingOrAsync::Blocking(process_fn) => task_spawner.spawn_blocking(process_fn),
BlockingOrAsync::Async(process_fn) => task_spawner.spawn_async(process_fn),
@@ -1667,6 +1672,21 @@ impl TaskSpawner {
WORKER_TASK_NAME,
)
}
/// Spawns a blocking task on a rayon thread pool, dropping the `SendOnDrop` after task completion.
fn spawn_blocking_with_rayon<F>(self, rayon_pool_type: RayonPoolType, task: F)
where
F: FnOnce() + Send + 'static,
{
self.executor.spawn_blocking_with_rayon(
move || {
task();
drop(self.send_idle_on_drop)
},
rayon_pool_type,
WORKER_TASK_NAME,
)
}
}
/// This struct will send a message on `self.tx` when it is dropped. An error will be logged

View File

@@ -37,7 +37,9 @@ const TASK_NAME: &str = "beacon_processor_reprocess_queue";
const GOSSIP_BLOCKS: &str = "gossip_blocks";
const RPC_BLOCKS: &str = "rpc_blocks";
const ATTESTATIONS: &str = "attestations";
const ATTESTATIONS_PER_ROOT: &str = "attestations_per_root";
const LIGHT_CLIENT_UPDATES: &str = "lc_updates";
const LIGHT_CLIENT_UPDATES_PER_PARENT_ROOT: &str = "lc_updates_per_parent_root";
/// Queue blocks for re-processing with an `ADDITIONAL_QUEUED_BLOCK_DELAY` after the slot starts.
/// This is to account for any slight drift in the system clock.
@@ -171,7 +173,7 @@ pub struct IgnoredRpcBlock {
}
/// A backfill batch work that has been queued for processing later.
pub struct QueuedBackfillBatch(pub AsyncFn);
pub struct QueuedBackfillBatch(pub BlockingFn);
pub struct QueuedColumnReconstruction {
pub block_root: Hash256,
@@ -829,10 +831,19 @@ impl<S: SlotClock> ReprocessQueue<S> {
);
}
if let Some(queued_atts) = self.awaiting_attestations_per_root.get_mut(&root)
&& let Some(index) = queued_atts.iter().position(|&id| id == queued_id)
if let Entry::Occupied(mut queued_atts) =
self.awaiting_attestations_per_root.entry(root)
&& let Some(index) =
queued_atts.get().iter().position(|&id| id == queued_id)
{
queued_atts.swap_remove(index);
let queued_atts_mut = queued_atts.get_mut();
queued_atts_mut.swap_remove(index);
// If the vec is empty after this attestation's removal, we need to delete
// the entry to prevent bloating the hashmap indefinitely.
if queued_atts_mut.is_empty() {
queued_atts.remove_entry();
}
}
}
}
@@ -853,13 +864,19 @@ impl<S: SlotClock> ReprocessQueue<S> {
error!("Failed to send scheduled light client optimistic update");
}
if let Some(queued_lc_updates) = self
.awaiting_lc_updates_per_parent_root
.get_mut(&parent_root)
&& let Some(index) =
queued_lc_updates.iter().position(|&id| id == queued_id)
if let Entry::Occupied(mut queued_lc_updates) =
self.awaiting_lc_updates_per_parent_root.entry(parent_root)
&& let Some(index) = queued_lc_updates
.get()
.iter()
.position(|&id| id == queued_id)
{
queued_lc_updates.swap_remove(index);
let queued_lc_updates_mut = queued_lc_updates.get_mut();
queued_lc_updates_mut.swap_remove(index);
if queued_lc_updates_mut.is_empty() {
queued_lc_updates.remove_entry();
}
}
}
}
@@ -929,11 +946,21 @@ impl<S: SlotClock> ReprocessQueue<S> {
&[ATTESTATIONS],
self.attestations_delay_queue.len() as i64,
);
metrics::set_gauge_vec(
&metrics::BEACON_PROCESSOR_REPROCESSING_QUEUE_TOTAL,
&[ATTESTATIONS_PER_ROOT],
self.awaiting_attestations_per_root.len() as i64,
);
metrics::set_gauge_vec(
&metrics::BEACON_PROCESSOR_REPROCESSING_QUEUE_TOTAL,
&[LIGHT_CLIENT_UPDATES],
self.lc_updates_delay_queue.len() as i64,
);
metrics::set_gauge_vec(
&metrics::BEACON_PROCESSOR_REPROCESSING_QUEUE_TOTAL,
&[LIGHT_CLIENT_UPDATES_PER_PARENT_ROOT],
self.awaiting_lc_updates_per_parent_root.len() as i64,
);
}
fn recompute_next_backfill_batch_event(&mut self) {
@@ -979,6 +1006,7 @@ impl<S: SlotClock> ReprocessQueue<S> {
#[cfg(test)]
mod tests {
use super::*;
use crate::BeaconProcessorConfig;
use logging::create_test_tracing_subscriber;
use slot_clock::{ManualSlotClock, TestingSlotClock};
use std::ops::Add;
@@ -1056,7 +1084,7 @@ mod tests {
// Now queue a backfill sync batch.
work_reprocessing_tx
.try_send(ReprocessQueueMessage::BackfillSync(QueuedBackfillBatch(
Box::pin(async {}),
Box::new(|| {}),
)))
.unwrap();
tokio::task::yield_now().await;
@@ -1101,4 +1129,97 @@ mod tests {
Duration::from_secs(slot_duration),
)
}
fn test_queue() -> ReprocessQueue<ManualSlotClock> {
create_test_tracing_subscriber();
let config = BeaconProcessorConfig::default();
let (ready_work_tx, _) = mpsc::channel::<ReadyWork>(config.max_scheduled_work_queue_len);
let (_, reprocess_work_rx) =
mpsc::channel::<ReprocessQueueMessage>(config.max_scheduled_work_queue_len);
let slot_clock = Arc::new(testing_slot_clock(12));
ReprocessQueue::new(ready_work_tx, reprocess_work_rx, slot_clock)
}
// This is a regression test for a memory leak in `awaiting_attestations_per_root`.
// See: https://github.com/sigp/lighthouse/pull/8065
#[tokio::test]
async fn prune_awaiting_attestations_per_root() {
create_test_tracing_subscriber();
let mut queue = test_queue();
// Pause time so it only advances manually
tokio::time::pause();
let beacon_block_root = Hash256::repeat_byte(0xaf);
// Insert an attestation.
let att = ReprocessQueueMessage::UnknownBlockUnaggregate(QueuedUnaggregate {
beacon_block_root,
process_fn: Box::new(|| {}),
});
// Process the event to enter it into the delay queue.
queue.handle_message(InboundEvent::Msg(att));
// Check that it is queued.
assert_eq!(queue.awaiting_attestations_per_root.len(), 1);
assert!(
queue
.awaiting_attestations_per_root
.contains_key(&beacon_block_root)
);
// Advance time to expire the attestation.
advance_time(&queue.slot_clock, 2 * QUEUED_ATTESTATION_DELAY).await;
let ready_msg = queue.next().await.unwrap();
assert!(matches!(ready_msg, InboundEvent::ReadyAttestation(_)));
queue.handle_message(ready_msg);
// The entry for the block root should be gone.
assert!(queue.awaiting_attestations_per_root.is_empty());
}
// This is a regression test for a memory leak in `awaiting_lc_updates_per_parent_root`.
// See: https://github.com/sigp/lighthouse/pull/8065
#[tokio::test]
async fn prune_awaiting_lc_updates_per_parent_root() {
create_test_tracing_subscriber();
let mut queue = test_queue();
// Pause time so it only advances manually
tokio::time::pause();
let parent_root = Hash256::repeat_byte(0xaf);
// Insert an attestation.
let msg =
ReprocessQueueMessage::UnknownLightClientOptimisticUpdate(QueuedLightClientUpdate {
parent_root,
process_fn: Box::new(|| {}),
});
// Process the event to enter it into the delay queue.
queue.handle_message(InboundEvent::Msg(msg));
// Check that it is queued.
assert_eq!(queue.awaiting_lc_updates_per_parent_root.len(), 1);
assert!(
queue
.awaiting_lc_updates_per_parent_root
.contains_key(&parent_root)
);
// Advance time to expire the update.
advance_time(&queue.slot_clock, 2 * QUEUED_LIGHT_CLIENT_UPDATE_DELAY).await;
let ready_msg = queue.next().await.unwrap();
assert!(matches!(ready_msg, InboundEvent::ReadyLightClientUpdate(_)));
queue.handle_message(ready_msg);
// The entry for the block root should be gone.
assert!(queue.awaiting_lc_updates_per_parent_root.is_empty());
}
}