diff --git a/beacon_node/beacon_processor/src/lib.rs b/beacon_node/beacon_processor/src/lib.rs index ae785e5127..14460a93fc 100644 --- a/beacon_node/beacon_processor/src/lib.rs +++ b/beacon_node/beacon_processor/src/lib.rs @@ -57,7 +57,7 @@ use std::future::Future; use std::pin::Pin; use std::sync::Arc; use std::task::Context; -use std::time::Duration; +use std::time::{Duration, Instant}; use strum::IntoStaticStr; use task_executor::TaskExecutor; use tokio::sync::mpsc; @@ -627,7 +627,7 @@ impl fmt::Debug for Work { } } -#[derive(IntoStaticStr, PartialEq, Eq, Debug)] +#[derive(IntoStaticStr, PartialEq, Eq, Debug, Clone)] #[strum(serialize_all = "snake_case")] pub enum WorkType { GossipAttestation, @@ -736,9 +736,9 @@ enum InboundEvent { /// A worker has completed a task and is free. WorkerIdle, /// There is new work to be done. - WorkEvent(WorkEvent), + WorkEvent((WorkEvent, Instant)), /// A work event that was queued for re-processing has become ready. - ReprocessingWork(WorkEvent), + ReprocessingWork((WorkEvent, Instant)), } /// Combines the various incoming event streams for the `BeaconProcessor` into a single stream. @@ -747,7 +747,7 @@ enum InboundEvent { /// control (specifically in the ordering of event processing). struct InboundEvents { /// Used by workers when they finish a task. - idle_rx: mpsc::Receiver<()>, + idle_rx: mpsc::Receiver, /// Used by upstream processes to send new work to the `BeaconProcessor`. event_rx: mpsc::Receiver>, /// Used internally for queuing work ready to be re-processed. @@ -761,7 +761,7 @@ impl Stream for InboundEvents { // Always check for idle workers before anything else. This allows us to ensure that a big // stream of new events doesn't suppress the processing of existing events. match self.idle_rx.poll_recv(cx) { - Poll::Ready(Some(())) => { + Poll::Ready(Some(_)) => { return Poll::Ready(Some(InboundEvent::WorkerIdle)); } Poll::Ready(None) => { @@ -774,7 +774,10 @@ impl Stream for InboundEvents { // block is required to successfully process some new work. match self.ready_work_rx.poll_recv(cx) { Poll::Ready(Some(ready_work)) => { - return Poll::Ready(Some(InboundEvent::ReprocessingWork(ready_work.into()))); + return Poll::Ready(Some(InboundEvent::ReprocessingWork(( + ready_work.into(), + Instant::now(), + )))); } Poll::Ready(None) => { return Poll::Ready(None); @@ -784,7 +787,7 @@ impl Stream for InboundEvents { match self.event_rx.poll_recv(cx) { Poll::Ready(Some(event)) => { - return Poll::Ready(Some(InboundEvent::WorkEvent(event))); + return Poll::Ready(Some(InboundEvent::WorkEvent((event, Instant::now())))); } Poll::Ready(None) => { return Poll::Ready(None); @@ -829,7 +832,7 @@ impl BeaconProcessor { queue_lengths: BeaconProcessorQueueLengths, ) -> Result<(), String> { // Used by workers to communicate that they are finished a task. - let (idle_tx, idle_rx) = mpsc::channel::<()>(MAX_IDLE_QUEUE_LEN); + let (idle_tx, idle_rx) = mpsc::channel::(MAX_IDLE_QUEUE_LEN); // Using LIFO queues for attestations since validator profits rely upon getting fresh // attestations into blocks. Additionally, later attestations contain more information than @@ -930,12 +933,14 @@ impl BeaconProcessor { let enable_backfill_rate_limiting = self.config.enable_backfill_rate_limiting; loop { - let work_event = match inbound_events.next().await { + let (work_event, created_timestamp) = match inbound_events.next().await { Some(InboundEvent::WorkerIdle) => { self.current_workers = self.current_workers.saturating_sub(1); - None + (None, Instant::now()) } - Some(InboundEvent::WorkEvent(event)) if enable_backfill_rate_limiting => { + Some(InboundEvent::WorkEvent((event, created_timestamp))) + if enable_backfill_rate_limiting => + { match QueuedBackfillBatch::try_from(event) { Ok(backfill_batch) => { match reprocess_work_tx @@ -952,7 +957,10 @@ impl BeaconProcessor { match reprocess_queue_message { ReprocessQueueMessage::BackfillSync( backfill_batch, - ) => Some(backfill_batch.into()), + ) => ( + Some(backfill_batch.into()), + created_timestamp, + ), other => { crit!( message_type = other.as_ref(), @@ -971,11 +979,13 @@ impl BeaconProcessor { } } } - Err(event) => Some(event), + Err(event) => (Some(event), created_timestamp), } } - Some(InboundEvent::WorkEvent(event)) - | Some(InboundEvent::ReprocessingWork(event)) => Some(event), + Some(InboundEvent::WorkEvent((event, created_timestamp))) + | Some(InboundEvent::ReprocessingWork((event, created_timestamp))) => { + (Some(event), created_timestamp) + } None => { debug!(msg = "stream ended", "Gossip processor stopped"); break; @@ -1245,7 +1255,7 @@ impl BeaconProcessor { if let Some(work_event) = work_event { let work_type = work_event.to_type(); - self.spawn_worker(work_event, idle_tx); + self.spawn_worker(work_event, created_timestamp, idle_tx); Some(work_type) } else { None @@ -1293,7 +1303,7 @@ impl BeaconProcessor { ) } } - _ if can_spawn => self.spawn_worker(work, idle_tx), + _ if can_spawn => self.spawn_worker(work, created_timestamp, idle_tx), Work::GossipAttestation { .. } => attestation_queue.push(work), // Attestation batches are formed internally within the // `BeaconProcessor`, they are not sent from external services. @@ -1486,8 +1496,22 @@ impl BeaconProcessor { /// Spawns a blocking worker thread to process some `Work`. /// /// Sends an message on `idle_tx` when the work is complete and the task is stopping. - fn spawn_worker(&mut self, work: Work, idle_tx: mpsc::Sender<()>) { + fn spawn_worker( + &mut self, + work: Work, + created_timestamp: Instant, + idle_tx: mpsc::Sender, + ) { let work_id = work.str_id(); + let work_type = work.to_type(); + + // This metric tracks how long a work event has been in the queue + metrics::observe_timer_vec( + &metrics::BEACON_PROCESSOR_QUEUE_TIME, + &[work_type.into()], + Instant::now() - created_timestamp, + ); + let worker_timer = metrics::start_timer_vec(&metrics::BEACON_PROCESSOR_WORKER_TIME, &[work_id]); metrics::inc_counter(&metrics::BEACON_PROCESSOR_WORKERS_SPAWNED_TOTAL); @@ -1502,6 +1526,7 @@ impl BeaconProcessor { // As such, this instantiation should happen as early in the function as possible. let send_idle_on_drop = SendOnDrop { tx: idle_tx, + work_type: work.to_type(), _worker_timer: worker_timer, }; @@ -1655,14 +1680,15 @@ impl TaskSpawner { /// /// https://doc.rust-lang.org/std/ops/trait.Drop.html#panics pub struct SendOnDrop { - tx: mpsc::Sender<()>, + tx: mpsc::Sender, + work_type: WorkType, // The field is unused, but it's here to ensure the timer is dropped once the task has finished. _worker_timer: Option, } impl Drop for SendOnDrop { fn drop(&mut self) { - if let Err(e) = self.tx.try_send(()) { + if let Err(e) = self.tx.try_send(self.work_type.clone()) { warn!( msg = "did not free worker, shutdown may be underway", error = %e, diff --git a/beacon_node/beacon_processor/src/metrics.rs b/beacon_node/beacon_processor/src/metrics.rs index 275875b1a4..b1d1d3dda6 100644 --- a/beacon_node/beacon_processor/src/metrics.rs +++ b/beacon_node/beacon_processor/src/metrics.rs @@ -128,3 +128,10 @@ pub static BEACON_PROCESSOR_SEND_ERROR_PER_WORK_TYPE: LazyLock> = LazyLock::new(|| { + try_create_histogram_vec( + "beacon_processor_queue_time", + "The delay between when a work event was queued in the beacon processor and when it was popped from the queue", + &["work_type"] + ) +});