diff --git a/beacon_node/beacon_processor/src/lib.rs b/beacon_node/beacon_processor/src/lib.rs index f506f0bb94..cd5a1d6cff 100644 --- a/beacon_node/beacon_processor/src/lib.rs +++ b/beacon_node/beacon_processor/src/lib.rs @@ -57,6 +57,7 @@ use std::pin::Pin; use std::sync::Arc; use std::task::Context; use std::time::Duration; +use strum::IntoStaticStr; use task_executor::TaskExecutor; use tokio::sync::mpsc; use tokio::sync::mpsc::error::TrySendError; @@ -219,46 +220,6 @@ const DEFAULT_MAX_GOSSIP_AGGREGATE_BATCH_SIZE: usize = 64; /// Unique IDs used for metrics and testing. pub const WORKER_FREED: &str = "worker_freed"; pub const NOTHING_TO_DO: &str = "nothing_to_do"; -pub const GOSSIP_ATTESTATION: &str = "gossip_attestation"; -pub const GOSSIP_ATTESTATION_BATCH: &str = "gossip_attestation_batch"; -pub const GOSSIP_AGGREGATE: &str = "gossip_aggregate"; -pub const GOSSIP_AGGREGATE_BATCH: &str = "gossip_aggregate_batch"; -pub const GOSSIP_BLOCK: &str = "gossip_block"; -pub const GOSSIP_BLOBS_SIDECAR: &str = "gossip_blobs_sidecar"; -pub const GOSSIP_BLOBS_COLUMN_SIDECAR: &str = "gossip_blobs_column_sidecar"; -pub const DELAYED_IMPORT_BLOCK: &str = "delayed_import_block"; -pub const GOSSIP_VOLUNTARY_EXIT: &str = "gossip_voluntary_exit"; -pub const GOSSIP_PROPOSER_SLASHING: &str = "gossip_proposer_slashing"; -pub const GOSSIP_ATTESTER_SLASHING: &str = "gossip_attester_slashing"; -pub const GOSSIP_SYNC_SIGNATURE: &str = "gossip_sync_signature"; -pub const GOSSIP_SYNC_CONTRIBUTION: &str = "gossip_sync_contribution"; -pub const GOSSIP_LIGHT_CLIENT_FINALITY_UPDATE: &str = "light_client_finality_update"; -pub const GOSSIP_LIGHT_CLIENT_OPTIMISTIC_UPDATE: &str = "light_client_optimistic_update"; -pub const RPC_BLOCK: &str = "rpc_block"; -pub const IGNORED_RPC_BLOCK: &str = "ignored_rpc_block"; -pub const RPC_BLOBS: &str = "rpc_blob"; -pub const RPC_CUSTODY_COLUMN: &str = "rpc_custody_column"; -pub const RPC_VERIFY_DATA_COLUMNS: &str = "rpc_verify_data_columns"; -pub const SAMPLING_RESULT: &str = "sampling_result"; -pub const CHAIN_SEGMENT: &str = "chain_segment"; -pub const CHAIN_SEGMENT_BACKFILL: &str = "chain_segment_backfill"; -pub const STATUS_PROCESSING: &str = "status_processing"; -pub const BLOCKS_BY_RANGE_REQUEST: &str = "blocks_by_range_request"; -pub const BLOCKS_BY_ROOTS_REQUEST: &str = "blocks_by_roots_request"; -pub const BLOBS_BY_RANGE_REQUEST: &str = "blobs_by_range_request"; -pub const BLOBS_BY_ROOTS_REQUEST: &str = "blobs_by_roots_request"; -pub const DATA_COLUMNS_BY_ROOTS_REQUEST: &str = "data_columns_by_roots_request"; -pub const DATA_COLUMNS_BY_RANGE_REQUEST: &str = "data_columns_by_range_request"; -pub const LIGHT_CLIENT_BOOTSTRAP_REQUEST: &str = "light_client_bootstrap"; -pub const LIGHT_CLIENT_FINALITY_UPDATE_REQUEST: &str = "light_client_finality_update_request"; -pub const LIGHT_CLIENT_OPTIMISTIC_UPDATE_REQUEST: &str = "light_client_optimistic_update_request"; -pub const UNKNOWN_BLOCK_ATTESTATION: &str = "unknown_block_attestation"; -pub const UNKNOWN_BLOCK_AGGREGATE: &str = "unknown_block_aggregate"; -pub const UNKNOWN_LIGHT_CLIENT_UPDATE: &str = "unknown_light_client_update"; -pub const UNKNOWN_BLOCK_SAMPLING_REQUEST: &str = "unknown_block_sampling_request"; -pub const GOSSIP_BLS_TO_EXECUTION_CHANGE: &str = "gossip_bls_to_execution_change"; -pub const API_REQUEST_P0: &str = "api_request_p0"; -pub const API_REQUEST_P1: &str = "api_request_p1"; #[derive(Clone, PartialEq, Debug, Serialize, Deserialize)] pub struct BeaconProcessorConfig { @@ -454,9 +415,14 @@ pub struct WorkEvent { } impl WorkEvent { + /// Get a representation of the type of work this `WorkEvent` contains. + pub fn work_type(&self) -> WorkType { + self.work.to_type() + } + /// Get a `str` representation of the type of work this `WorkEvent` contains. - pub fn work_type(&self) -> &'static str { - self.work.str_id() + pub fn work_type_str(&self) -> &'static str { + self.work_type().into() } } @@ -555,7 +521,7 @@ impl BeaconProcessorSend { Err(e) => { metrics::inc_counter_vec( &metrics::BEACON_PROCESSOR_SEND_ERROR_PER_WORK_TYPE, - &[work_type], + &[work_type.into()], ); Err(e) } @@ -651,54 +617,109 @@ pub enum Work { impl fmt::Debug for Work { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "{}", self.str_id()) + write!(f, "{}", Into::<&'static str>::into(self.to_type())) } } +#[derive(IntoStaticStr, PartialEq, Eq, Debug)] +#[strum(serialize_all = "snake_case")] +pub enum WorkType { + GossipAttestation, + UnknownBlockAttestation, + GossipAttestationBatch, + GossipAggregate, + UnknownBlockAggregate, + UnknownLightClientOptimisticUpdate, + UnknownBlockSamplingRequest, + GossipAggregateBatch, + GossipBlock, + GossipBlobSidecar, + GossipDataColumnSidecar, + DelayedImportBlock, + GossipVoluntaryExit, + GossipProposerSlashing, + GossipAttesterSlashing, + GossipSyncSignature, + GossipSyncContribution, + GossipLightClientFinalityUpdate, + GossipLightClientOptimisticUpdate, + RpcBlock, + RpcBlobs, + RpcCustodyColumn, + RpcVerifyDataColumn, + SamplingResult, + IgnoredRpcBlock, + ChainSegment, + ChainSegmentBackfill, + Status, + BlocksByRangeRequest, + BlocksByRootsRequest, + BlobsByRangeRequest, + BlobsByRootsRequest, + DataColumnsByRootsRequest, + DataColumnsByRangeRequest, + GossipBlsToExecutionChange, + LightClientBootstrapRequest, + LightClientOptimisticUpdateRequest, + LightClientFinalityUpdateRequest, + ApiRequestP0, + ApiRequestP1, +} + impl Work { - /// Provides a `&str` that uniquely identifies each enum variant. fn str_id(&self) -> &'static str { + self.to_type().into() + } + + /// Provides a `&str` that uniquely identifies each enum variant. + fn to_type(&self) -> WorkType { match self { - Work::GossipAttestation { .. } => GOSSIP_ATTESTATION, - Work::GossipAttestationBatch { .. } => GOSSIP_ATTESTATION_BATCH, - Work::GossipAggregate { .. } => GOSSIP_AGGREGATE, - Work::GossipAggregateBatch { .. } => GOSSIP_AGGREGATE_BATCH, - Work::GossipBlock(_) => GOSSIP_BLOCK, - Work::GossipBlobSidecar(_) => GOSSIP_BLOBS_SIDECAR, - Work::GossipDataColumnSidecar(_) => GOSSIP_BLOBS_COLUMN_SIDECAR, - Work::DelayedImportBlock { .. } => DELAYED_IMPORT_BLOCK, - Work::GossipVoluntaryExit(_) => GOSSIP_VOLUNTARY_EXIT, - Work::GossipProposerSlashing(_) => GOSSIP_PROPOSER_SLASHING, - Work::GossipAttesterSlashing(_) => GOSSIP_ATTESTER_SLASHING, - Work::GossipSyncSignature(_) => GOSSIP_SYNC_SIGNATURE, - Work::GossipSyncContribution(_) => GOSSIP_SYNC_CONTRIBUTION, - Work::GossipLightClientFinalityUpdate(_) => GOSSIP_LIGHT_CLIENT_FINALITY_UPDATE, - Work::GossipLightClientOptimisticUpdate(_) => GOSSIP_LIGHT_CLIENT_OPTIMISTIC_UPDATE, - Work::RpcBlock { .. } => RPC_BLOCK, - Work::RpcBlobs { .. } => RPC_BLOBS, - Work::RpcCustodyColumn { .. } => RPC_CUSTODY_COLUMN, - Work::RpcVerifyDataColumn(_) => RPC_VERIFY_DATA_COLUMNS, - Work::SamplingResult(_) => SAMPLING_RESULT, - Work::IgnoredRpcBlock { .. } => IGNORED_RPC_BLOCK, - Work::ChainSegment { .. } => CHAIN_SEGMENT, - Work::ChainSegmentBackfill(_) => CHAIN_SEGMENT_BACKFILL, - Work::Status(_) => STATUS_PROCESSING, - Work::BlocksByRangeRequest(_) => BLOCKS_BY_RANGE_REQUEST, - Work::BlocksByRootsRequest(_) => BLOCKS_BY_ROOTS_REQUEST, - Work::BlobsByRangeRequest(_) => BLOBS_BY_RANGE_REQUEST, - Work::BlobsByRootsRequest(_) => BLOBS_BY_ROOTS_REQUEST, - Work::DataColumnsByRootsRequest(_) => DATA_COLUMNS_BY_ROOTS_REQUEST, - Work::DataColumnsByRangeRequest(_) => DATA_COLUMNS_BY_RANGE_REQUEST, - Work::LightClientBootstrapRequest(_) => LIGHT_CLIENT_BOOTSTRAP_REQUEST, - Work::LightClientOptimisticUpdateRequest(_) => LIGHT_CLIENT_OPTIMISTIC_UPDATE_REQUEST, - Work::LightClientFinalityUpdateRequest(_) => LIGHT_CLIENT_FINALITY_UPDATE_REQUEST, - Work::UnknownBlockAttestation { .. } => UNKNOWN_BLOCK_ATTESTATION, - Work::UnknownBlockAggregate { .. } => UNKNOWN_BLOCK_AGGREGATE, - Work::UnknownLightClientOptimisticUpdate { .. } => UNKNOWN_LIGHT_CLIENT_UPDATE, - Work::UnknownBlockSamplingRequest { .. } => UNKNOWN_BLOCK_SAMPLING_REQUEST, - Work::GossipBlsToExecutionChange(_) => GOSSIP_BLS_TO_EXECUTION_CHANGE, - Work::ApiRequestP0 { .. } => API_REQUEST_P0, - Work::ApiRequestP1 { .. } => API_REQUEST_P1, + Work::GossipAttestation { .. } => WorkType::GossipAttestation, + Work::GossipAttestationBatch { .. } => WorkType::GossipAttestationBatch, + Work::GossipAggregate { .. } => WorkType::GossipAggregate, + Work::GossipAggregateBatch { .. } => WorkType::GossipAggregateBatch, + Work::GossipBlock(_) => WorkType::GossipBlock, + Work::GossipBlobSidecar(_) => WorkType::GossipBlobSidecar, + Work::GossipDataColumnSidecar(_) => WorkType::GossipDataColumnSidecar, + Work::DelayedImportBlock { .. } => WorkType::DelayedImportBlock, + Work::GossipVoluntaryExit(_) => WorkType::GossipVoluntaryExit, + Work::GossipProposerSlashing(_) => WorkType::GossipProposerSlashing, + Work::GossipAttesterSlashing(_) => WorkType::GossipAttesterSlashing, + Work::GossipSyncSignature(_) => WorkType::GossipSyncSignature, + Work::GossipSyncContribution(_) => WorkType::GossipSyncContribution, + Work::GossipLightClientFinalityUpdate(_) => WorkType::GossipLightClientFinalityUpdate, + Work::GossipLightClientOptimisticUpdate(_) => { + WorkType::GossipLightClientOptimisticUpdate + } + Work::GossipBlsToExecutionChange(_) => WorkType::GossipBlsToExecutionChange, + Work::RpcBlock { .. } => WorkType::RpcBlock, + Work::RpcBlobs { .. } => WorkType::RpcBlobs, + Work::RpcCustodyColumn { .. } => WorkType::RpcCustodyColumn, + Work::RpcVerifyDataColumn { .. } => WorkType::RpcVerifyDataColumn, + Work::SamplingResult { .. } => WorkType::SamplingResult, + Work::IgnoredRpcBlock { .. } => WorkType::IgnoredRpcBlock, + Work::ChainSegment { .. } => WorkType::ChainSegment, + Work::ChainSegmentBackfill(_) => WorkType::ChainSegmentBackfill, + Work::Status(_) => WorkType::Status, + Work::BlocksByRangeRequest(_) => WorkType::BlocksByRangeRequest, + Work::BlocksByRootsRequest(_) => WorkType::BlocksByRootsRequest, + Work::BlobsByRangeRequest(_) => WorkType::BlobsByRangeRequest, + Work::BlobsByRootsRequest(_) => WorkType::BlobsByRootsRequest, + Work::DataColumnsByRootsRequest(_) => WorkType::DataColumnsByRootsRequest, + Work::DataColumnsByRangeRequest(_) => WorkType::DataColumnsByRangeRequest, + Work::LightClientBootstrapRequest(_) => WorkType::LightClientBootstrapRequest, + Work::LightClientOptimisticUpdateRequest(_) => { + WorkType::LightClientOptimisticUpdateRequest + } + Work::LightClientFinalityUpdateRequest(_) => WorkType::LightClientFinalityUpdateRequest, + Work::UnknownBlockAttestation { .. } => WorkType::UnknownBlockAttestation, + Work::UnknownBlockAggregate { .. } => WorkType::UnknownBlockAggregate, + Work::UnknownBlockSamplingRequest { .. } => WorkType::UnknownBlockSamplingRequest, + Work::UnknownLightClientOptimisticUpdate { .. } => { + WorkType::UnknownLightClientOptimisticUpdate + } + Work::ApiRequestP0 { .. } => WorkType::ApiRequestP0, + Work::ApiRequestP1 { .. } => WorkType::ApiRequestP1, } } } @@ -987,7 +1008,7 @@ impl BeaconProcessor { .map_or(false, |event| event.drop_during_sync); let idle_tx = idle_tx.clone(); - match work_event { + let modified_queue_id = match work_event { // There is no new work event, but we are able to spawn a new worker. // // We don't check the `work.drop_during_sync` here. We assume that if it made @@ -995,38 +1016,40 @@ impl BeaconProcessor { None if can_spawn => { // Check for chain segments first, they're the most efficient way to get // blocks into the system. - if let Some(item) = chain_segment_queue.pop() { - self.spawn_worker(item, idle_tx); + let work_event: Option> = if let Some(item) = + chain_segment_queue.pop() + { + Some(item) // Check sync blocks before gossip blocks, since we've already explicitly // requested these blocks. } else if let Some(item) = rpc_block_queue.pop() { - self.spawn_worker(item, idle_tx); + Some(item) } else if let Some(item) = rpc_blob_queue.pop() { - self.spawn_worker(item, idle_tx); + Some(item) } else if let Some(item) = rpc_custody_column_queue.pop() { - self.spawn_worker(item, idle_tx); + Some(item) // TODO(das): decide proper prioritization for sampling columns } else if let Some(item) = rpc_custody_column_queue.pop() { - self.spawn_worker(item, idle_tx); + Some(item) } else if let Some(item) = rpc_verify_data_column_queue.pop() { - self.spawn_worker(item, idle_tx); + Some(item) } else if let Some(item) = sampling_result_queue.pop() { - self.spawn_worker(item, idle_tx); + Some(item) // Check delayed blocks before gossip blocks, the gossip blocks might rely // on the delayed ones. } else if let Some(item) = delayed_block_queue.pop() { - self.spawn_worker(item, idle_tx); + Some(item) // Check gossip blocks before gossip attestations, since a block might be // required to verify some attestations. } else if let Some(item) = gossip_block_queue.pop() { - self.spawn_worker(item, idle_tx); + Some(item) } else if let Some(item) = gossip_blob_queue.pop() { - self.spawn_worker(item, idle_tx); + Some(item) } else if let Some(item) = gossip_data_column_queue.pop() { - self.spawn_worker(item, idle_tx); + Some(item) // Check the priority 0 API requests after blocks and blobs, but before attestations. } else if let Some(item) = api_request_p0_queue.pop() { - self.spawn_worker(item, idle_tx); + Some(item) // Check the aggregates, *then* the unaggregates since we assume that // aggregates are more valuable to local validators and effectively give us // more information with less signature verification time. @@ -1038,9 +1061,7 @@ impl BeaconProcessor { if batch_size < 2 { // One single aggregate is in the queue, process it individually. - if let Some(item) = aggregate_queue.pop() { - self.spawn_worker(item, idle_tx); - } + aggregate_queue.pop() } else { // Collect two or more aggregates into a batch, so they can take // advantage of batch signature verification. @@ -1071,13 +1092,10 @@ impl BeaconProcessor { if let Some(process_batch) = process_batch_opt { // Process all aggregates with a single worker. - self.spawn_worker( - Work::GossipAggregateBatch { - aggregates, - process_batch, - }, - idle_tx, - ) + Some(Work::GossipAggregateBatch { + aggregates, + process_batch, + }) } else { // There is no good reason for this to // happen, it is a serious logic error. @@ -1085,6 +1103,7 @@ impl BeaconProcessor { // work items exist, we should always have a // work closure at this point. crit!(self.log, "Missing aggregate work"); + None } } // Check the unaggregated attestation queue. @@ -1098,9 +1117,7 @@ impl BeaconProcessor { if batch_size < 2 { // One single attestation is in the queue, process it individually. - if let Some(item) = attestation_queue.pop() { - self.spawn_worker(item, idle_tx); - } + attestation_queue.pop() } else { // Collect two or more attestations into a batch, so they can take // advantage of batch signature verification. @@ -1132,13 +1149,10 @@ impl BeaconProcessor { if let Some(process_batch) = process_batch_opt { // Process all attestations with a single worker. - self.spawn_worker( - Work::GossipAttestationBatch { - attestations, - process_batch, - }, - idle_tx, - ) + Some(Work::GossipAttestationBatch { + attestations, + process_batch, + }) } else { // There is no good reason for this to // happen, it is a serious logic error. @@ -1146,71 +1160,72 @@ impl BeaconProcessor { // work items exist, we should always have a // work closure at this point. crit!(self.log, "Missing attestations work"); + None } } // Check sync committee messages after attestations as their rewards are lesser // and they don't influence fork choice. } else if let Some(item) = sync_contribution_queue.pop() { - self.spawn_worker(item, idle_tx); + Some(item) } else if let Some(item) = sync_message_queue.pop() { - self.spawn_worker(item, idle_tx); + Some(item) // Aggregates and unaggregates queued for re-processing are older and we // care about fresher ones, so check those first. } else if let Some(item) = unknown_block_aggregate_queue.pop() { - self.spawn_worker(item, idle_tx); + Some(item) } else if let Some(item) = unknown_block_attestation_queue.pop() { - self.spawn_worker(item, idle_tx); + Some(item) // Check RPC methods next. Status messages are needed for sync so // prioritize them over syncing requests from other peers (BlocksByRange // and BlocksByRoot) } else if let Some(item) = status_queue.pop() { - self.spawn_worker(item, idle_tx); + Some(item) } else if let Some(item) = bbrange_queue.pop() { - self.spawn_worker(item, idle_tx); + Some(item) } else if let Some(item) = bbroots_queue.pop() { - self.spawn_worker(item, idle_tx); + Some(item) } else if let Some(item) = blbrange_queue.pop() { - self.spawn_worker(item, idle_tx); + Some(item) } else if let Some(item) = blbroots_queue.pop() { - self.spawn_worker(item, idle_tx); + Some(item) } else if let Some(item) = dcbroots_queue.pop() { - self.spawn_worker(item, idle_tx); + Some(item) } else if let Some(item) = dcbrange_queue.pop() { - self.spawn_worker(item, idle_tx); + Some(item) // Prioritize sampling requests after block syncing requests } else if let Some(item) = unknown_block_sampling_request_queue.pop() { - self.spawn_worker(item, idle_tx); + Some(item) // Check slashings after all other consensus messages so we prioritize // following head. // // Check attester slashings before proposer slashings since they have the // potential to slash multiple validators at once. } else if let Some(item) = gossip_attester_slashing_queue.pop() { - self.spawn_worker(item, idle_tx); + Some(item) } else if let Some(item) = gossip_proposer_slashing_queue.pop() { - self.spawn_worker(item, idle_tx); + Some(item) // Check exits and address changes late since our validators don't get // rewards from them. } else if let Some(item) = gossip_voluntary_exit_queue.pop() { - self.spawn_worker(item, idle_tx); + Some(item) } else if let Some(item) = gossip_bls_to_execution_change_queue.pop() { - self.spawn_worker(item, idle_tx); + Some(item) // Check the priority 1 API requests after we've // processed all the interesting things from the network // and things required for us to stay in good repute // with our P2P peers. } else if let Some(item) = api_request_p1_queue.pop() { - self.spawn_worker(item, idle_tx); + Some(item) // Handle backfill sync chain segments. } else if let Some(item) = backfill_chain_segment.pop() { - self.spawn_worker(item, idle_tx); + Some(item) // Handle light client requests. } else if let Some(item) = lc_bootstrap_queue.pop() { - self.spawn_worker(item, idle_tx); + Some(item) } else if let Some(item) = lc_optimistic_update_queue.pop() { - self.spawn_worker(item, idle_tx); + Some(item) } else if let Some(item) = lc_finality_update_queue.pop() { - self.spawn_worker(item, idle_tx); + Some(item) // This statement should always be the final else statement. } else { // Let the journal know that a worker is freed and there's nothing else @@ -1220,6 +1235,15 @@ impl BeaconProcessor { // during testing. let _ = work_journal_tx.try_send(NOTHING_TO_DO); } + None + }; + + if let Some(work_event) = work_event { + let work_type = work_event.to_type(); + self.spawn_worker(work_event, idle_tx); + Some(work_type) + } else { + None } } // There is no new work event and we are unable to spawn a new worker. @@ -1231,6 +1255,7 @@ impl BeaconProcessor { "Unexpected gossip processor condition"; "msg" => "no new work and cannot spawn worker" ); + None } // The chain is syncing and this event should be dropped during sync. Some(work_event) @@ -1248,11 +1273,13 @@ impl BeaconProcessor { "msg" => "chain is syncing", "work_id" => work_id ); + None } // There is a new work event and the chain is not syncing. Process it or queue // it. Some(WorkEvent { work, .. }) => { let work_id = work.str_id(); + let work_type = work.to_type(); match work { _ if can_spawn => self.spawn_worker(work, idle_tx), @@ -1371,94 +1398,76 @@ impl BeaconProcessor { Work::ApiRequestP1 { .. } => { api_request_p1_queue.push(work, work_id, &self.log) } - } + }; + Some(work_type) } - } + }; metrics::set_gauge( &metrics::BEACON_PROCESSOR_WORKERS_ACTIVE_TOTAL, self.current_workers as i64, ); - metrics::set_gauge( - &metrics::BEACON_PROCESSOR_UNAGGREGATED_ATTESTATION_QUEUE_TOTAL, - attestation_queue.len() as i64, - ); - metrics::set_gauge( - &metrics::BEACON_PROCESSOR_AGGREGATED_ATTESTATION_QUEUE_TOTAL, - aggregate_queue.len() as i64, - ); - metrics::set_gauge( - &metrics::BEACON_PROCESSOR_SYNC_MESSAGE_QUEUE_TOTAL, - sync_message_queue.len() as i64, - ); - metrics::set_gauge( - &metrics::BEACON_PROCESSOR_SYNC_CONTRIBUTION_QUEUE_TOTAL, - sync_contribution_queue.len() as i64, - ); - metrics::set_gauge( - &metrics::BEACON_PROCESSOR_GOSSIP_BLOCK_QUEUE_TOTAL, - gossip_block_queue.len() as i64, - ); - metrics::set_gauge( - &metrics::BEACON_PROCESSOR_GOSSIP_BLOB_QUEUE_TOTAL, - gossip_blob_queue.len() as i64, - ); - metrics::set_gauge( - &metrics::BEACON_PROCESSOR_GOSSIP_DATA_COLUMN_QUEUE_TOTAL, - gossip_data_column_queue.len() as i64, - ); - metrics::set_gauge( - &metrics::BEACON_PROCESSOR_RPC_BLOCK_QUEUE_TOTAL, - rpc_block_queue.len() as i64, - ); - metrics::set_gauge( - &metrics::BEACON_PROCESSOR_RPC_BLOB_QUEUE_TOTAL, - rpc_blob_queue.len() as i64, - ); - metrics::set_gauge( - &metrics::BEACON_PROCESSOR_RPC_CUSTODY_COLUMN_QUEUE_TOTAL, - rpc_custody_column_queue.len() as i64, - ); - metrics::set_gauge( - &metrics::BEACON_PROCESSOR_RPC_VERIFY_DATA_COLUMN_QUEUE_TOTAL, - rpc_verify_data_column_queue.len() as i64, - ); - metrics::set_gauge( - &metrics::BEACON_PROCESSOR_SAMPLING_RESULT_QUEUE_TOTAL, - sampling_result_queue.len() as i64, - ); - metrics::set_gauge( - &metrics::BEACON_PROCESSOR_CHAIN_SEGMENT_QUEUE_TOTAL, - chain_segment_queue.len() as i64, - ); - metrics::set_gauge( - &metrics::BEACON_PROCESSOR_BACKFILL_CHAIN_SEGMENT_QUEUE_TOTAL, - backfill_chain_segment.len() as i64, - ); - metrics::set_gauge( - &metrics::BEACON_PROCESSOR_EXIT_QUEUE_TOTAL, - gossip_voluntary_exit_queue.len() as i64, - ); - metrics::set_gauge( - &metrics::BEACON_PROCESSOR_PROPOSER_SLASHING_QUEUE_TOTAL, - gossip_proposer_slashing_queue.len() as i64, - ); - metrics::set_gauge( - &metrics::BEACON_PROCESSOR_ATTESTER_SLASHING_QUEUE_TOTAL, - gossip_attester_slashing_queue.len() as i64, - ); - metrics::set_gauge( - &metrics::BEACON_PROCESSOR_BLS_TO_EXECUTION_CHANGE_QUEUE_TOTAL, - gossip_bls_to_execution_change_queue.len() as i64, - ); - metrics::set_gauge( - &metrics::BEACON_PROCESSOR_API_REQUEST_P0_QUEUE_TOTAL, - api_request_p0_queue.len() as i64, - ); - metrics::set_gauge( - &metrics::BEACON_PROCESSOR_API_REQUEST_P1_QUEUE_TOTAL, - api_request_p1_queue.len() as i64, - ); + + if let Some(modified_queue_id) = modified_queue_id { + let queue_len = match modified_queue_id { + WorkType::GossipAttestation => aggregate_queue.len(), + WorkType::UnknownBlockAttestation => unknown_block_attestation_queue.len(), + WorkType::GossipAttestationBatch => 0, // No queue + WorkType::GossipAggregate => aggregate_queue.len(), + WorkType::UnknownBlockAggregate => unknown_block_aggregate_queue.len(), + WorkType::UnknownLightClientOptimisticUpdate => { + unknown_light_client_update_queue.len() + } + WorkType::UnknownBlockSamplingRequest => { + unknown_block_sampling_request_queue.len() + } + WorkType::GossipAggregateBatch => 0, // No queue + WorkType::GossipBlock => gossip_block_queue.len(), + WorkType::GossipBlobSidecar => gossip_blob_queue.len(), + WorkType::GossipDataColumnSidecar => gossip_data_column_queue.len(), + WorkType::DelayedImportBlock => delayed_block_queue.len(), + WorkType::GossipVoluntaryExit => gossip_voluntary_exit_queue.len(), + WorkType::GossipProposerSlashing => gossip_proposer_slashing_queue.len(), + WorkType::GossipAttesterSlashing => gossip_attester_slashing_queue.len(), + WorkType::GossipSyncSignature => sync_message_queue.len(), + WorkType::GossipSyncContribution => sync_contribution_queue.len(), + WorkType::GossipLightClientFinalityUpdate => finality_update_queue.len(), + WorkType::GossipLightClientOptimisticUpdate => { + optimistic_update_queue.len() + } + WorkType::RpcBlock => rpc_block_queue.len(), + WorkType::RpcBlobs | WorkType::IgnoredRpcBlock => rpc_blob_queue.len(), + WorkType::RpcCustodyColumn => rpc_custody_column_queue.len(), + WorkType::RpcVerifyDataColumn => rpc_verify_data_column_queue.len(), + WorkType::SamplingResult => sampling_result_queue.len(), + WorkType::ChainSegment => chain_segment_queue.len(), + WorkType::ChainSegmentBackfill => backfill_chain_segment.len(), + WorkType::Status => status_queue.len(), + WorkType::BlocksByRangeRequest => blbrange_queue.len(), + WorkType::BlocksByRootsRequest => blbroots_queue.len(), + WorkType::BlobsByRangeRequest => bbrange_queue.len(), + WorkType::BlobsByRootsRequest => bbroots_queue.len(), + WorkType::DataColumnsByRootsRequest => dcbroots_queue.len(), + WorkType::DataColumnsByRangeRequest => dcbrange_queue.len(), + WorkType::GossipBlsToExecutionChange => { + gossip_bls_to_execution_change_queue.len() + } + WorkType::LightClientBootstrapRequest => lc_bootstrap_queue.len(), + WorkType::LightClientOptimisticUpdateRequest => { + lc_optimistic_update_queue.len() + } + WorkType::LightClientFinalityUpdateRequest => { + lc_finality_update_queue.len() + } + WorkType::ApiRequestP0 => api_request_p0_queue.len(), + WorkType::ApiRequestP1 => api_request_p1_queue.len(), + }; + metrics::observe_vec( + &metrics::BEACON_PROCESSOR_QUEUE_LENGTH, + &[modified_queue_id.into()], + queue_len as f64, + ); + } if aggregate_queue.is_full() && aggregate_debounce.elapsed() { error!( diff --git a/beacon_node/beacon_processor/src/metrics.rs b/beacon_node/beacon_processor/src/metrics.rs index 8bc03cee6c..0a7bdba18d 100644 --- a/beacon_node/beacon_processor/src/metrics.rs +++ b/beacon_node/beacon_processor/src/metrics.rs @@ -62,163 +62,16 @@ pub static BEACON_PROCESSOR_EVENT_HANDLING_SECONDS: LazyLock> "Time spent handling a new message and allocating it to a queue or worker.", ) }); -// Gossip blocks. -pub static BEACON_PROCESSOR_GOSSIP_BLOCK_QUEUE_TOTAL: LazyLock> = - LazyLock::new(|| { - try_create_int_gauge( - "beacon_processor_gossip_block_queue_total", - "Count of blocks from gossip waiting to be verified.", - ) - }); -// Gossip blobs. -pub static BEACON_PROCESSOR_GOSSIP_BLOB_QUEUE_TOTAL: LazyLock> = - LazyLock::new(|| { - try_create_int_gauge( - "beacon_processor_gossip_blob_queue_total", - "Count of blobs from gossip waiting to be verified.", - ) - }); -// Gossip data column sidecars. -pub static BEACON_PROCESSOR_GOSSIP_DATA_COLUMN_QUEUE_TOTAL: LazyLock> = - LazyLock::new(|| { - try_create_int_gauge( - "beacon_processor_gossip_data_column_queue_total", - "Count of data column sidecars from gossip waiting to be verified.", - ) - }); -// Gossip Exits. -pub static BEACON_PROCESSOR_EXIT_QUEUE_TOTAL: LazyLock> = LazyLock::new(|| { - try_create_int_gauge( - "beacon_processor_exit_queue_total", - "Count of exits from gossip waiting to be verified.", +pub static BEACON_PROCESSOR_QUEUE_LENGTH: LazyLock> = LazyLock::new(|| { + try_create_histogram_vec_with_buckets( + "beacon_processor_work_event_queue_length", + "Count of work events in queue waiting to be processed.", + Ok(vec![ + 0.0, 1.0, 4.0, 16.0, 64.0, 256.0, 1024.0, 4096.0, 16384.0, 65536.0, + ]), + &["type"], ) }); -// Gossip proposer slashings. -pub static BEACON_PROCESSOR_PROPOSER_SLASHING_QUEUE_TOTAL: LazyLock> = - LazyLock::new(|| { - try_create_int_gauge( - "beacon_processor_proposer_slashing_queue_total", - "Count of proposer slashings from gossip waiting to be verified.", - ) - }); -// Gossip attester slashings. -pub static BEACON_PROCESSOR_ATTESTER_SLASHING_QUEUE_TOTAL: LazyLock> = - LazyLock::new(|| { - try_create_int_gauge( - "beacon_processor_attester_slashing_queue_total", - "Count of attester slashings from gossip waiting to be verified.", - ) - }); -// Gossip BLS to execution changes. -pub static BEACON_PROCESSOR_BLS_TO_EXECUTION_CHANGE_QUEUE_TOTAL: LazyLock> = - LazyLock::new(|| { - try_create_int_gauge( - "beacon_processor_bls_to_execution_change_queue_total", - "Count of address changes from gossip waiting to be verified.", - ) - }); -// Rpc blocks. -pub static BEACON_PROCESSOR_RPC_BLOCK_QUEUE_TOTAL: LazyLock> = - LazyLock::new(|| { - try_create_int_gauge( - "beacon_processor_rpc_block_queue_total", - "Count of blocks from the rpc waiting to be verified.", - ) - }); -// Rpc blobs. -pub static BEACON_PROCESSOR_RPC_BLOB_QUEUE_TOTAL: LazyLock> = - LazyLock::new(|| { - try_create_int_gauge( - "beacon_processor_rpc_blob_queue_total", - "Count of blobs from the rpc waiting to be verified.", - ) - }); -// Rpc custody data columns. -pub static BEACON_PROCESSOR_RPC_CUSTODY_COLUMN_QUEUE_TOTAL: LazyLock> = - LazyLock::new(|| { - try_create_int_gauge( - "beacon_processor_rpc_custody_column_queue_total", - "Count of custody columns from the rpc waiting to be imported.", - ) - }); -// Rpc verify data columns -pub static BEACON_PROCESSOR_RPC_VERIFY_DATA_COLUMN_QUEUE_TOTAL: LazyLock> = - LazyLock::new(|| { - try_create_int_gauge( - "beacon_processor_rpc_verify_data_column_queue_total", - "Count of data columns from the rpc waiting to be verified.", - ) - }); -// Sampling result -pub static BEACON_PROCESSOR_SAMPLING_RESULT_QUEUE_TOTAL: LazyLock> = - LazyLock::new(|| { - try_create_int_gauge( - "beacon_processor_sampling_result_queue_total", - "Count of sampling results waiting to be processed.", - ) - }); -// Chain segments. -pub static BEACON_PROCESSOR_CHAIN_SEGMENT_QUEUE_TOTAL: LazyLock> = - LazyLock::new(|| { - try_create_int_gauge( - "beacon_processor_chain_segment_queue_total", - "Count of chain segments from the rpc waiting to be verified.", - ) - }); -pub static BEACON_PROCESSOR_BACKFILL_CHAIN_SEGMENT_QUEUE_TOTAL: LazyLock> = - LazyLock::new(|| { - try_create_int_gauge( - "beacon_processor_backfill_chain_segment_queue_total", - "Count of backfill chain segments from the rpc waiting to be verified.", - ) - }); -// Unaggregated attestations. -pub static BEACON_PROCESSOR_UNAGGREGATED_ATTESTATION_QUEUE_TOTAL: LazyLock> = - LazyLock::new(|| { - try_create_int_gauge( - "beacon_processor_unaggregated_attestation_queue_total", - "Count of unagg. attestations waiting to be processed.", - ) - }); -// Aggregated attestations. -pub static BEACON_PROCESSOR_AGGREGATED_ATTESTATION_QUEUE_TOTAL: LazyLock> = - LazyLock::new(|| { - try_create_int_gauge( - "beacon_processor_aggregated_attestation_queue_total", - "Count of agg. attestations waiting to be processed.", - ) - }); -// Sync committee messages. -pub static BEACON_PROCESSOR_SYNC_MESSAGE_QUEUE_TOTAL: LazyLock> = - LazyLock::new(|| { - try_create_int_gauge( - "beacon_processor_sync_message_queue_total", - "Count of sync committee messages waiting to be processed.", - ) - }); -// Sync contribution. -pub static BEACON_PROCESSOR_SYNC_CONTRIBUTION_QUEUE_TOTAL: LazyLock> = - LazyLock::new(|| { - try_create_int_gauge( - "beacon_processor_sync_contribution_queue_total", - "Count of sync committee contributions waiting to be processed.", - ) - }); -// HTTP API requests. -pub static BEACON_PROCESSOR_API_REQUEST_P0_QUEUE_TOTAL: LazyLock> = - LazyLock::new(|| { - try_create_int_gauge( - "beacon_processor_api_request_p0_queue_total", - "Count of P0 HTTP requesets waiting to be processed.", - ) - }); -pub static BEACON_PROCESSOR_API_REQUEST_P1_QUEUE_TOTAL: LazyLock> = - LazyLock::new(|| { - try_create_int_gauge( - "beacon_processor_api_request_p1_queue_total", - "Count of P1 HTTP requesets waiting to be processed.", - ) - }); /* * Attestation reprocessing queue metrics. diff --git a/beacon_node/network/src/network_beacon_processor/tests.rs b/beacon_node/network/src/network_beacon_processor/tests.rs index 40c69a0baa..391175ccd4 100644 --- a/beacon_node/network/src/network_beacon_processor/tests.rs +++ b/beacon_node/network/src/network_beacon_processor/tests.rs @@ -467,10 +467,11 @@ impl TestRig { /// /// Given the described logic, `expected` must not contain `WORKER_FREED` or `NOTHING_TO_DO` /// events. - pub async fn assert_event_journal_contains_ordered(&mut self, expected: &[&str]) { - assert!(expected + pub async fn assert_event_journal_contains_ordered(&mut self, expected: &[WorkType]) { + let expected = expected .iter() - .all(|ev| ev != &WORKER_FREED && ev != &NOTHING_TO_DO)); + .map(|ev| ev.into()) + .collect::>(); let mut events = Vec::with_capacity(expected.len()); let mut worker_freed_remaining = expected.len(); @@ -517,6 +518,18 @@ impl TestRig { .await } + pub async fn assert_event_journal_completes(&mut self, expected: &[WorkType]) { + self.assert_event_journal( + &expected + .iter() + .map(|ev| Into::<&'static str>::into(ev)) + .chain(std::iter::once(WORKER_FREED)) + .chain(std::iter::once(NOTHING_TO_DO)) + .collect::>(), + ) + .await + } + /// Assert that the `BeaconProcessor` event journal is as `expected`. /// /// ## Note @@ -587,13 +600,13 @@ async fn import_gossip_block_acceptably_early() { rig.enqueue_gossip_block(); - rig.assert_event_journal(&[GOSSIP_BLOCK, WORKER_FREED, NOTHING_TO_DO]) + rig.assert_event_journal_completes(&[WorkType::GossipBlock]) .await; let num_blobs = rig.next_blobs.as_ref().map(|b| b.len()).unwrap_or(0); for i in 0..num_blobs { rig.enqueue_gossip_blob(i); - rig.assert_event_journal(&[GOSSIP_BLOBS_SIDECAR, WORKER_FREED, NOTHING_TO_DO]) + rig.assert_event_journal_completes(&[WorkType::GossipBlobSidecar]) .await; } @@ -611,7 +624,7 @@ async fn import_gossip_block_acceptably_early() { "block not yet imported" ); - rig.assert_event_journal(&[DELAYED_IMPORT_BLOCK, WORKER_FREED, NOTHING_TO_DO]) + rig.assert_event_journal_completes(&[WorkType::DelayedImportBlock]) .await; assert_eq!( @@ -644,7 +657,7 @@ async fn import_gossip_block_unacceptably_early() { rig.enqueue_gossip_block(); - rig.assert_event_journal(&[GOSSIP_BLOCK, WORKER_FREED, NOTHING_TO_DO]) + rig.assert_event_journal_completes(&[WorkType::GossipBlock]) .await; // Waiting for 5 seconds is a bit arbitrary, however it *should* be long enough to ensure the @@ -670,7 +683,7 @@ async fn import_gossip_block_at_current_slot() { rig.enqueue_gossip_block(); - rig.assert_event_journal(&[GOSSIP_BLOCK, WORKER_FREED, NOTHING_TO_DO]) + rig.assert_event_journal_completes(&[WorkType::GossipBlock]) .await; let num_blobs = rig @@ -682,7 +695,7 @@ async fn import_gossip_block_at_current_slot() { for i in 0..num_blobs { rig.enqueue_gossip_blob(i); - rig.assert_event_journal(&[GOSSIP_BLOBS_SIDECAR, WORKER_FREED, NOTHING_TO_DO]) + rig.assert_event_journal_completes(&[WorkType::GossipBlobSidecar]) .await; } @@ -702,7 +715,7 @@ async fn import_gossip_attestation() { rig.enqueue_unaggregated_attestation(); - rig.assert_event_journal(&[GOSSIP_ATTESTATION, WORKER_FREED, NOTHING_TO_DO]) + rig.assert_event_journal_completes(&[WorkType::GossipAttestation]) .await; assert_eq!( @@ -728,7 +741,7 @@ async fn attestation_to_unknown_block_processed(import_method: BlockImportMethod rig.enqueue_next_block_unaggregated_attestation(); - rig.assert_event_journal(&[GOSSIP_ATTESTATION, WORKER_FREED, NOTHING_TO_DO]) + rig.assert_event_journal_completes(&[WorkType::GossipAttestation]) .await; assert_eq!( @@ -747,23 +760,23 @@ async fn attestation_to_unknown_block_processed(import_method: BlockImportMethod match import_method { BlockImportMethod::Gossip => { rig.enqueue_gossip_block(); - events.push(GOSSIP_BLOCK); + events.push(WorkType::GossipBlock); for i in 0..num_blobs { rig.enqueue_gossip_blob(i); - events.push(GOSSIP_BLOBS_SIDECAR); + events.push(WorkType::GossipBlobSidecar); } } BlockImportMethod::Rpc => { rig.enqueue_rpc_block(); - events.push(RPC_BLOCK); + events.push(WorkType::RpcBlock); if num_blobs > 0 { rig.enqueue_single_lookup_rpc_blobs(); - events.push(RPC_BLOBS); + events.push(WorkType::RpcBlobs); } } }; - events.push(UNKNOWN_BLOCK_ATTESTATION); + events.push(WorkType::UnknownBlockAttestation); rig.assert_event_journal_contains_ordered(&events).await; @@ -809,7 +822,7 @@ async fn aggregate_attestation_to_unknown_block(import_method: BlockImportMethod rig.enqueue_next_block_aggregated_attestation(); - rig.assert_event_journal(&[GOSSIP_AGGREGATE, WORKER_FREED, NOTHING_TO_DO]) + rig.assert_event_journal_completes(&[WorkType::GossipAggregate]) .await; assert_eq!( @@ -828,23 +841,23 @@ async fn aggregate_attestation_to_unknown_block(import_method: BlockImportMethod match import_method { BlockImportMethod::Gossip => { rig.enqueue_gossip_block(); - events.push(GOSSIP_BLOCK); + events.push(WorkType::GossipBlock); for i in 0..num_blobs { rig.enqueue_gossip_blob(i); - events.push(GOSSIP_BLOBS_SIDECAR); + events.push(WorkType::GossipBlobSidecar); } } BlockImportMethod::Rpc => { rig.enqueue_rpc_block(); - events.push(RPC_BLOCK); + events.push(WorkType::RpcBlock); if num_blobs > 0 { rig.enqueue_single_lookup_rpc_blobs(); - events.push(RPC_BLOBS); + events.push(WorkType::RpcBlobs); } } }; - events.push(UNKNOWN_BLOCK_AGGREGATE); + events.push(WorkType::UnknownBlockAggregate); rig.assert_event_journal_contains_ordered(&events).await; @@ -887,7 +900,7 @@ async fn requeue_unknown_block_gossip_attestation_without_import() { rig.enqueue_next_block_unaggregated_attestation(); - rig.assert_event_journal(&[GOSSIP_ATTESTATION, WORKER_FREED, NOTHING_TO_DO]) + rig.assert_event_journal_completes(&[WorkType::GossipAttestation]) .await; assert_eq!( @@ -899,7 +912,11 @@ async fn requeue_unknown_block_gossip_attestation_without_import() { // Ensure that the attestation is received back but not imported. rig.assert_event_journal_with_timeout( - &[UNKNOWN_BLOCK_ATTESTATION, WORKER_FREED, NOTHING_TO_DO], + &[ + WorkType::UnknownBlockAttestation.into(), + WORKER_FREED, + NOTHING_TO_DO, + ], Duration::from_secs(1) + QUEUED_ATTESTATION_DELAY, ) .await; @@ -923,7 +940,7 @@ async fn requeue_unknown_block_gossip_aggregated_attestation_without_import() { rig.enqueue_next_block_aggregated_attestation(); - rig.assert_event_journal(&[GOSSIP_AGGREGATE, WORKER_FREED, NOTHING_TO_DO]) + rig.assert_event_journal_completes(&[WorkType::GossipAggregate]) .await; assert_eq!( @@ -935,7 +952,11 @@ async fn requeue_unknown_block_gossip_aggregated_attestation_without_import() { // Ensure that the attestation is received back but not imported. rig.assert_event_journal_with_timeout( - &[UNKNOWN_BLOCK_AGGREGATE, WORKER_FREED, NOTHING_TO_DO], + &[ + WorkType::UnknownBlockAggregate.into(), + WORKER_FREED, + NOTHING_TO_DO, + ], Duration::from_secs(1) + QUEUED_ATTESTATION_DELAY, ) .await; @@ -961,7 +982,7 @@ async fn import_misc_gossip_ops() { rig.enqueue_gossip_attester_slashing(); - rig.assert_event_journal(&[GOSSIP_ATTESTER_SLASHING, WORKER_FREED, NOTHING_TO_DO]) + rig.assert_event_journal_completes(&[WorkType::GossipAttesterSlashing]) .await; assert_eq!( @@ -978,7 +999,7 @@ async fn import_misc_gossip_ops() { rig.enqueue_gossip_proposer_slashing(); - rig.assert_event_journal(&[GOSSIP_PROPOSER_SLASHING, WORKER_FREED, NOTHING_TO_DO]) + rig.assert_event_journal_completes(&[WorkType::GossipProposerSlashing]) .await; assert_eq!( @@ -995,7 +1016,7 @@ async fn import_misc_gossip_ops() { rig.enqueue_gossip_voluntary_exit(); - rig.assert_event_journal(&[GOSSIP_VOLUNTARY_EXIT, WORKER_FREED, NOTHING_TO_DO]) + rig.assert_event_journal_completes(&[WorkType::GossipVoluntaryExit]) .await; assert_eq!( @@ -1014,12 +1035,12 @@ async fn test_rpc_block_reprocessing() { // Insert the next block into the duplicate cache manually let handle = rig.duplicate_cache.check_and_insert(next_block_root); rig.enqueue_single_lookup_rpc_block(); - rig.assert_event_journal(&[RPC_BLOCK, WORKER_FREED, NOTHING_TO_DO]) + rig.assert_event_journal_completes(&[WorkType::RpcBlock]) .await; rig.enqueue_single_lookup_rpc_blobs(); if rig.next_blobs.as_ref().map(|b| b.len()).unwrap_or(0) > 0 { - rig.assert_event_journal(&[RPC_BLOBS, WORKER_FREED, NOTHING_TO_DO]) + rig.assert_event_journal_completes(&[WorkType::RpcBlobs]) .await; } @@ -1033,7 +1054,7 @@ async fn test_rpc_block_reprocessing() { // the specified delay. tokio::time::sleep(QUEUED_RPC_BLOCK_DELAY).await; - rig.assert_event_journal(&[RPC_BLOCK]).await; + rig.assert_event_journal(&[WorkType::RpcBlock.into()]).await; // Add an extra delay for block processing tokio::time::sleep(Duration::from_millis(10)).await; // head should update to next block now since the duplicate @@ -1055,7 +1076,11 @@ async fn test_backfill_sync_processing() { rig.assert_no_events_for(Duration::from_millis(100)).await; // A new batch should be processed within a slot. rig.assert_event_journal_with_timeout( - &[CHAIN_SEGMENT_BACKFILL, WORKER_FREED, NOTHING_TO_DO], + &[ + WorkType::ChainSegmentBackfill.into(), + WORKER_FREED, + NOTHING_TO_DO, + ], rig.chain.slot_clock.slot_duration(), ) .await; @@ -1075,9 +1100,9 @@ async fn test_backfill_sync_processing_rate_limiting_disabled() { // ensure all batches are processed rig.assert_event_journal_with_timeout( &[ - CHAIN_SEGMENT_BACKFILL, - CHAIN_SEGMENT_BACKFILL, - CHAIN_SEGMENT_BACKFILL, + WorkType::ChainSegmentBackfill.into(), + WorkType::ChainSegmentBackfill.into(), + WorkType::ChainSegmentBackfill.into(), ], Duration::from_millis(100), ) diff --git a/beacon_node/network/src/router.rs b/beacon_node/network/src/router.rs index a5e27f582a..26c1d14f02 100644 --- a/beacon_node/network/src/router.rs +++ b/beacon_node/network/src/router.rs @@ -710,7 +710,7 @@ impl Router { if let Err(e) = result { let work_type = match &e { mpsc::error::TrySendError::Closed(work) | mpsc::error::TrySendError::Full(work) => { - work.work_type() + work.work_type_str() } }; diff --git a/beacon_node/network/src/sync/block_lookups/tests.rs b/beacon_node/network/src/sync/block_lookups/tests.rs index 6d852b2572..a8a7ad5849 100644 --- a/beacon_node/network/src/sync/block_lookups/tests.rs +++ b/beacon_node/network/src/sync/block_lookups/tests.rs @@ -1032,17 +1032,17 @@ impl TestRig { match response_type { ResponseType::Block => self .pop_received_processor_event(|ev| { - (ev.work_type() == beacon_processor::RPC_BLOCK).then_some(()) + (ev.work_type() == beacon_processor::WorkType::RpcBlock).then_some(()) }) .unwrap_or_else(|e| panic!("Expected block work event: {e}")), ResponseType::Blob => self .pop_received_processor_event(|ev| { - (ev.work_type() == beacon_processor::RPC_BLOBS).then_some(()) + (ev.work_type() == beacon_processor::WorkType::RpcBlobs).then_some(()) }) .unwrap_or_else(|e| panic!("Expected blobs work event: {e}")), ResponseType::CustodyColumn => self .pop_received_processor_event(|ev| { - (ev.work_type() == beacon_processor::RPC_CUSTODY_COLUMN).then_some(()) + (ev.work_type() == beacon_processor::WorkType::RpcCustodyColumn).then_some(()) }) .unwrap_or_else(|e| panic!("Expected column work event: {e}")), } @@ -1050,7 +1050,7 @@ impl TestRig { fn expect_rpc_custody_column_work_event(&mut self) { self.pop_received_processor_event(|ev| { - if ev.work_type() == beacon_processor::RPC_CUSTODY_COLUMN { + if ev.work_type() == beacon_processor::WorkType::RpcCustodyColumn { Some(()) } else { None @@ -1061,7 +1061,7 @@ impl TestRig { fn expect_rpc_sample_verify_work_event(&mut self) { self.pop_received_processor_event(|ev| { - if ev.work_type() == beacon_processor::RPC_VERIFY_DATA_COLUMNS { + if ev.work_type() == beacon_processor::WorkType::RpcVerifyDataColumn { Some(()) } else { None @@ -1072,7 +1072,7 @@ impl TestRig { fn expect_sampling_result_work(&mut self) { self.pop_received_processor_event(|ev| { - if ev.work_type() == beacon_processor::SAMPLING_RESULT { + if ev.work_type() == beacon_processor::WorkType::SamplingResult { Some(()) } else { None @@ -1103,7 +1103,7 @@ impl TestRig { match self.beacon_processor_rx.try_recv() { Ok(work) => { // Parent chain sends blocks one by one - assert_eq!(work.work_type(), beacon_processor::RPC_BLOCK); + assert_eq!(work.work_type(), beacon_processor::WorkType::RpcBlock); } other => panic!( "Expected rpc_block from chain segment process, found {:?}", diff --git a/beacon_node/network/src/sync/range_sync/range.rs b/beacon_node/network/src/sync/range_sync/range.rs index beb04fac28..28dea8e4b5 100644 --- a/beacon_node/network/src/sync/range_sync/range.rs +++ b/beacon_node/network/src/sync/range_sync/range.rs @@ -652,7 +652,10 @@ mod tests { fn expect_empty_processor(&mut self) { match self.beacon_processor_rx.try_recv() { Ok(work) => { - panic!("Expected empty processor. Instead got {}", work.work_type()); + panic!( + "Expected empty processor. Instead got {}", + work.work_type_str() + ); } Err(e) => match e { mpsc::error::TryRecvError::Empty => {} @@ -665,7 +668,7 @@ mod tests { fn expect_chain_segment(&mut self) { match self.beacon_processor_rx.try_recv() { Ok(work) => { - assert_eq!(work.work_type(), beacon_processor::CHAIN_SEGMENT); + assert_eq!(work.work_type(), beacon_processor::WorkType::ChainSegment); } other => panic!("Expected chain segment process, found {:?}", other), } diff --git a/common/lighthouse_metrics/src/lib.rs b/common/lighthouse_metrics/src/lib.rs index f52913dd00..2a1e99defa 100644 --- a/common/lighthouse_metrics/src/lib.rs +++ b/common/lighthouse_metrics/src/lib.rs @@ -283,6 +283,12 @@ pub fn stop_timer(timer: Option) { } } +pub fn observe_vec(vec: &Result, name: &[&str], value: f64) { + if let Some(h) = get_histogram(vec, name) { + h.observe(value) + } +} + pub fn inc_counter(counter: &Result) { if let Ok(counter) = counter { counter.inc();