mirror of
https://github.com/sigp/lighthouse.git
synced 2026-03-02 16:21:42 +00:00
Add metrics to track beacon processor queue times (#7808)
This PR adds a created_timestamp to the beacon processor send channel. When work items are sent through that channel `try_send` will forward the work event along with the current timestamp to the beacon processor. When the work event is completed the `Drop` impl for `SendOnDrop` will track the time it took from work event creation to its completion. Previously we only had data on how long a work event took to process, but didn't have data on how long it sat in the queue + how long it took to process.
This commit is contained in:
@@ -57,7 +57,7 @@ use std::future::Future;
|
||||
use std::pin::Pin;
|
||||
use std::sync::Arc;
|
||||
use std::task::Context;
|
||||
use std::time::Duration;
|
||||
use std::time::{Duration, Instant};
|
||||
use strum::IntoStaticStr;
|
||||
use task_executor::TaskExecutor;
|
||||
use tokio::sync::mpsc;
|
||||
@@ -627,7 +627,7 @@ impl<E: EthSpec> fmt::Debug for Work<E> {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(IntoStaticStr, PartialEq, Eq, Debug)]
|
||||
#[derive(IntoStaticStr, PartialEq, Eq, Debug, Clone)]
|
||||
#[strum(serialize_all = "snake_case")]
|
||||
pub enum WorkType {
|
||||
GossipAttestation,
|
||||
@@ -736,9 +736,9 @@ enum InboundEvent<E: EthSpec> {
|
||||
/// A worker has completed a task and is free.
|
||||
WorkerIdle,
|
||||
/// There is new work to be done.
|
||||
WorkEvent(WorkEvent<E>),
|
||||
WorkEvent((WorkEvent<E>, Instant)),
|
||||
/// A work event that was queued for re-processing has become ready.
|
||||
ReprocessingWork(WorkEvent<E>),
|
||||
ReprocessingWork((WorkEvent<E>, Instant)),
|
||||
}
|
||||
|
||||
/// Combines the various incoming event streams for the `BeaconProcessor` into a single stream.
|
||||
@@ -747,7 +747,7 @@ enum InboundEvent<E: EthSpec> {
|
||||
/// control (specifically in the ordering of event processing).
|
||||
struct InboundEvents<E: EthSpec> {
|
||||
/// Used by workers when they finish a task.
|
||||
idle_rx: mpsc::Receiver<()>,
|
||||
idle_rx: mpsc::Receiver<WorkType>,
|
||||
/// Used by upstream processes to send new work to the `BeaconProcessor`.
|
||||
event_rx: mpsc::Receiver<WorkEvent<E>>,
|
||||
/// Used internally for queuing work ready to be re-processed.
|
||||
@@ -761,7 +761,7 @@ impl<E: EthSpec> Stream for InboundEvents<E> {
|
||||
// Always check for idle workers before anything else. This allows us to ensure that a big
|
||||
// stream of new events doesn't suppress the processing of existing events.
|
||||
match self.idle_rx.poll_recv(cx) {
|
||||
Poll::Ready(Some(())) => {
|
||||
Poll::Ready(Some(_)) => {
|
||||
return Poll::Ready(Some(InboundEvent::WorkerIdle));
|
||||
}
|
||||
Poll::Ready(None) => {
|
||||
@@ -774,7 +774,10 @@ impl<E: EthSpec> Stream for InboundEvents<E> {
|
||||
// block is required to successfully process some new work.
|
||||
match self.ready_work_rx.poll_recv(cx) {
|
||||
Poll::Ready(Some(ready_work)) => {
|
||||
return Poll::Ready(Some(InboundEvent::ReprocessingWork(ready_work.into())));
|
||||
return Poll::Ready(Some(InboundEvent::ReprocessingWork((
|
||||
ready_work.into(),
|
||||
Instant::now(),
|
||||
))));
|
||||
}
|
||||
Poll::Ready(None) => {
|
||||
return Poll::Ready(None);
|
||||
@@ -784,7 +787,7 @@ impl<E: EthSpec> Stream for InboundEvents<E> {
|
||||
|
||||
match self.event_rx.poll_recv(cx) {
|
||||
Poll::Ready(Some(event)) => {
|
||||
return Poll::Ready(Some(InboundEvent::WorkEvent(event)));
|
||||
return Poll::Ready(Some(InboundEvent::WorkEvent((event, Instant::now()))));
|
||||
}
|
||||
Poll::Ready(None) => {
|
||||
return Poll::Ready(None);
|
||||
@@ -829,7 +832,7 @@ impl<E: EthSpec> BeaconProcessor<E> {
|
||||
queue_lengths: BeaconProcessorQueueLengths,
|
||||
) -> Result<(), String> {
|
||||
// Used by workers to communicate that they are finished a task.
|
||||
let (idle_tx, idle_rx) = mpsc::channel::<()>(MAX_IDLE_QUEUE_LEN);
|
||||
let (idle_tx, idle_rx) = mpsc::channel::<WorkType>(MAX_IDLE_QUEUE_LEN);
|
||||
|
||||
// Using LIFO queues for attestations since validator profits rely upon getting fresh
|
||||
// attestations into blocks. Additionally, later attestations contain more information than
|
||||
@@ -930,12 +933,14 @@ impl<E: EthSpec> BeaconProcessor<E> {
|
||||
let enable_backfill_rate_limiting = self.config.enable_backfill_rate_limiting;
|
||||
|
||||
loop {
|
||||
let work_event = match inbound_events.next().await {
|
||||
let (work_event, created_timestamp) = match inbound_events.next().await {
|
||||
Some(InboundEvent::WorkerIdle) => {
|
||||
self.current_workers = self.current_workers.saturating_sub(1);
|
||||
None
|
||||
(None, Instant::now())
|
||||
}
|
||||
Some(InboundEvent::WorkEvent(event)) if enable_backfill_rate_limiting => {
|
||||
Some(InboundEvent::WorkEvent((event, created_timestamp)))
|
||||
if enable_backfill_rate_limiting =>
|
||||
{
|
||||
match QueuedBackfillBatch::try_from(event) {
|
||||
Ok(backfill_batch) => {
|
||||
match reprocess_work_tx
|
||||
@@ -952,7 +957,10 @@ impl<E: EthSpec> BeaconProcessor<E> {
|
||||
match reprocess_queue_message {
|
||||
ReprocessQueueMessage::BackfillSync(
|
||||
backfill_batch,
|
||||
) => Some(backfill_batch.into()),
|
||||
) => (
|
||||
Some(backfill_batch.into()),
|
||||
created_timestamp,
|
||||
),
|
||||
other => {
|
||||
crit!(
|
||||
message_type = other.as_ref(),
|
||||
@@ -971,11 +979,13 @@ impl<E: EthSpec> BeaconProcessor<E> {
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(event) => Some(event),
|
||||
Err(event) => (Some(event), created_timestamp),
|
||||
}
|
||||
}
|
||||
Some(InboundEvent::WorkEvent(event))
|
||||
| Some(InboundEvent::ReprocessingWork(event)) => Some(event),
|
||||
Some(InboundEvent::WorkEvent((event, created_timestamp)))
|
||||
| Some(InboundEvent::ReprocessingWork((event, created_timestamp))) => {
|
||||
(Some(event), created_timestamp)
|
||||
}
|
||||
None => {
|
||||
debug!(msg = "stream ended", "Gossip processor stopped");
|
||||
break;
|
||||
@@ -1245,7 +1255,7 @@ impl<E: EthSpec> BeaconProcessor<E> {
|
||||
|
||||
if let Some(work_event) = work_event {
|
||||
let work_type = work_event.to_type();
|
||||
self.spawn_worker(work_event, idle_tx);
|
||||
self.spawn_worker(work_event, created_timestamp, idle_tx);
|
||||
Some(work_type)
|
||||
} else {
|
||||
None
|
||||
@@ -1293,7 +1303,7 @@ impl<E: EthSpec> BeaconProcessor<E> {
|
||||
)
|
||||
}
|
||||
}
|
||||
_ if can_spawn => self.spawn_worker(work, idle_tx),
|
||||
_ if can_spawn => self.spawn_worker(work, created_timestamp, idle_tx),
|
||||
Work::GossipAttestation { .. } => attestation_queue.push(work),
|
||||
// Attestation batches are formed internally within the
|
||||
// `BeaconProcessor`, they are not sent from external services.
|
||||
@@ -1486,8 +1496,22 @@ impl<E: EthSpec> BeaconProcessor<E> {
|
||||
/// Spawns a blocking worker thread to process some `Work`.
|
||||
///
|
||||
/// Sends an message on `idle_tx` when the work is complete and the task is stopping.
|
||||
fn spawn_worker(&mut self, work: Work<E>, idle_tx: mpsc::Sender<()>) {
|
||||
fn spawn_worker(
|
||||
&mut self,
|
||||
work: Work<E>,
|
||||
created_timestamp: Instant,
|
||||
idle_tx: mpsc::Sender<WorkType>,
|
||||
) {
|
||||
let work_id = work.str_id();
|
||||
let work_type = work.to_type();
|
||||
|
||||
// This metric tracks how long a work event has been in the queue
|
||||
metrics::observe_timer_vec(
|
||||
&metrics::BEACON_PROCESSOR_QUEUE_TIME,
|
||||
&[work_type.into()],
|
||||
Instant::now() - created_timestamp,
|
||||
);
|
||||
|
||||
let worker_timer =
|
||||
metrics::start_timer_vec(&metrics::BEACON_PROCESSOR_WORKER_TIME, &[work_id]);
|
||||
metrics::inc_counter(&metrics::BEACON_PROCESSOR_WORKERS_SPAWNED_TOTAL);
|
||||
@@ -1502,6 +1526,7 @@ impl<E: EthSpec> BeaconProcessor<E> {
|
||||
// As such, this instantiation should happen as early in the function as possible.
|
||||
let send_idle_on_drop = SendOnDrop {
|
||||
tx: idle_tx,
|
||||
work_type: work.to_type(),
|
||||
_worker_timer: worker_timer,
|
||||
};
|
||||
|
||||
@@ -1655,14 +1680,15 @@ impl TaskSpawner {
|
||||
///
|
||||
/// https://doc.rust-lang.org/std/ops/trait.Drop.html#panics
|
||||
pub struct SendOnDrop {
|
||||
tx: mpsc::Sender<()>,
|
||||
tx: mpsc::Sender<WorkType>,
|
||||
work_type: WorkType,
|
||||
// The field is unused, but it's here to ensure the timer is dropped once the task has finished.
|
||||
_worker_timer: Option<metrics::HistogramTimer>,
|
||||
}
|
||||
|
||||
impl Drop for SendOnDrop {
|
||||
fn drop(&mut self) {
|
||||
if let Err(e) = self.tx.try_send(()) {
|
||||
if let Err(e) = self.tx.try_send(self.work_type.clone()) {
|
||||
warn!(
|
||||
msg = "did not free worker, shutdown may be underway",
|
||||
error = %e,
|
||||
|
||||
@@ -128,3 +128,10 @@ pub static BEACON_PROCESSOR_SEND_ERROR_PER_WORK_TYPE: LazyLock<Result<IntCounter
|
||||
&["type"],
|
||||
)
|
||||
});
|
||||
pub static BEACON_PROCESSOR_QUEUE_TIME: LazyLock<Result<HistogramVec>> = LazyLock::new(|| {
|
||||
try_create_histogram_vec(
|
||||
"beacon_processor_queue_time",
|
||||
"The delay between when a work event was queued in the beacon processor and when it was popped from the queue",
|
||||
&["work_type"]
|
||||
)
|
||||
});
|
||||
|
||||
Reference in New Issue
Block a user