Register processor queue length as histogram (#6012)

* Register processor queue length as histogram

* Merge branch 'unstable' of https://github.com/sigp/lighthouse into processor-queue-histogram
This commit is contained in:
Lion - dapplion
2024-09-09 14:18:30 +02:00
committed by GitHub
parent 815567a91a
commit 51091a40fa
7 changed files with 319 additions and 423 deletions

View File

@@ -57,6 +57,7 @@ use std::pin::Pin;
use std::sync::Arc;
use std::task::Context;
use std::time::Duration;
use strum::IntoStaticStr;
use task_executor::TaskExecutor;
use tokio::sync::mpsc;
use tokio::sync::mpsc::error::TrySendError;
@@ -219,46 +220,6 @@ const DEFAULT_MAX_GOSSIP_AGGREGATE_BATCH_SIZE: usize = 64;
/// Unique IDs used for metrics and testing.
pub const WORKER_FREED: &str = "worker_freed";
pub const NOTHING_TO_DO: &str = "nothing_to_do";
pub const GOSSIP_ATTESTATION: &str = "gossip_attestation";
pub const GOSSIP_ATTESTATION_BATCH: &str = "gossip_attestation_batch";
pub const GOSSIP_AGGREGATE: &str = "gossip_aggregate";
pub const GOSSIP_AGGREGATE_BATCH: &str = "gossip_aggregate_batch";
pub const GOSSIP_BLOCK: &str = "gossip_block";
pub const GOSSIP_BLOBS_SIDECAR: &str = "gossip_blobs_sidecar";
pub const GOSSIP_BLOBS_COLUMN_SIDECAR: &str = "gossip_blobs_column_sidecar";
pub const DELAYED_IMPORT_BLOCK: &str = "delayed_import_block";
pub const GOSSIP_VOLUNTARY_EXIT: &str = "gossip_voluntary_exit";
pub const GOSSIP_PROPOSER_SLASHING: &str = "gossip_proposer_slashing";
pub const GOSSIP_ATTESTER_SLASHING: &str = "gossip_attester_slashing";
pub const GOSSIP_SYNC_SIGNATURE: &str = "gossip_sync_signature";
pub const GOSSIP_SYNC_CONTRIBUTION: &str = "gossip_sync_contribution";
pub const GOSSIP_LIGHT_CLIENT_FINALITY_UPDATE: &str = "light_client_finality_update";
pub const GOSSIP_LIGHT_CLIENT_OPTIMISTIC_UPDATE: &str = "light_client_optimistic_update";
pub const RPC_BLOCK: &str = "rpc_block";
pub const IGNORED_RPC_BLOCK: &str = "ignored_rpc_block";
pub const RPC_BLOBS: &str = "rpc_blob";
pub const RPC_CUSTODY_COLUMN: &str = "rpc_custody_column";
pub const RPC_VERIFY_DATA_COLUMNS: &str = "rpc_verify_data_columns";
pub const SAMPLING_RESULT: &str = "sampling_result";
pub const CHAIN_SEGMENT: &str = "chain_segment";
pub const CHAIN_SEGMENT_BACKFILL: &str = "chain_segment_backfill";
pub const STATUS_PROCESSING: &str = "status_processing";
pub const BLOCKS_BY_RANGE_REQUEST: &str = "blocks_by_range_request";
pub const BLOCKS_BY_ROOTS_REQUEST: &str = "blocks_by_roots_request";
pub const BLOBS_BY_RANGE_REQUEST: &str = "blobs_by_range_request";
pub const BLOBS_BY_ROOTS_REQUEST: &str = "blobs_by_roots_request";
pub const DATA_COLUMNS_BY_ROOTS_REQUEST: &str = "data_columns_by_roots_request";
pub const DATA_COLUMNS_BY_RANGE_REQUEST: &str = "data_columns_by_range_request";
pub const LIGHT_CLIENT_BOOTSTRAP_REQUEST: &str = "light_client_bootstrap";
pub const LIGHT_CLIENT_FINALITY_UPDATE_REQUEST: &str = "light_client_finality_update_request";
pub const LIGHT_CLIENT_OPTIMISTIC_UPDATE_REQUEST: &str = "light_client_optimistic_update_request";
pub const UNKNOWN_BLOCK_ATTESTATION: &str = "unknown_block_attestation";
pub const UNKNOWN_BLOCK_AGGREGATE: &str = "unknown_block_aggregate";
pub const UNKNOWN_LIGHT_CLIENT_UPDATE: &str = "unknown_light_client_update";
pub const UNKNOWN_BLOCK_SAMPLING_REQUEST: &str = "unknown_block_sampling_request";
pub const GOSSIP_BLS_TO_EXECUTION_CHANGE: &str = "gossip_bls_to_execution_change";
pub const API_REQUEST_P0: &str = "api_request_p0";
pub const API_REQUEST_P1: &str = "api_request_p1";
#[derive(Clone, PartialEq, Debug, Serialize, Deserialize)]
pub struct BeaconProcessorConfig {
@@ -454,9 +415,14 @@ pub struct WorkEvent<E: EthSpec> {
}
impl<E: EthSpec> WorkEvent<E> {
/// Get a representation of the type of work this `WorkEvent` contains.
pub fn work_type(&self) -> WorkType {
self.work.to_type()
}
/// Get a `str` representation of the type of work this `WorkEvent` contains.
pub fn work_type(&self) -> &'static str {
self.work.str_id()
pub fn work_type_str(&self) -> &'static str {
self.work_type().into()
}
}
@@ -555,7 +521,7 @@ impl<E: EthSpec> BeaconProcessorSend<E> {
Err(e) => {
metrics::inc_counter_vec(
&metrics::BEACON_PROCESSOR_SEND_ERROR_PER_WORK_TYPE,
&[work_type],
&[work_type.into()],
);
Err(e)
}
@@ -651,54 +617,109 @@ pub enum Work<E: EthSpec> {
impl<E: EthSpec> fmt::Debug for Work<E> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.str_id())
write!(f, "{}", Into::<&'static str>::into(self.to_type()))
}
}
#[derive(IntoStaticStr, PartialEq, Eq, Debug)]
#[strum(serialize_all = "snake_case")]
pub enum WorkType {
GossipAttestation,
UnknownBlockAttestation,
GossipAttestationBatch,
GossipAggregate,
UnknownBlockAggregate,
UnknownLightClientOptimisticUpdate,
UnknownBlockSamplingRequest,
GossipAggregateBatch,
GossipBlock,
GossipBlobSidecar,
GossipDataColumnSidecar,
DelayedImportBlock,
GossipVoluntaryExit,
GossipProposerSlashing,
GossipAttesterSlashing,
GossipSyncSignature,
GossipSyncContribution,
GossipLightClientFinalityUpdate,
GossipLightClientOptimisticUpdate,
RpcBlock,
RpcBlobs,
RpcCustodyColumn,
RpcVerifyDataColumn,
SamplingResult,
IgnoredRpcBlock,
ChainSegment,
ChainSegmentBackfill,
Status,
BlocksByRangeRequest,
BlocksByRootsRequest,
BlobsByRangeRequest,
BlobsByRootsRequest,
DataColumnsByRootsRequest,
DataColumnsByRangeRequest,
GossipBlsToExecutionChange,
LightClientBootstrapRequest,
LightClientOptimisticUpdateRequest,
LightClientFinalityUpdateRequest,
ApiRequestP0,
ApiRequestP1,
}
impl<E: EthSpec> Work<E> {
/// Provides a `&str` that uniquely identifies each enum variant.
fn str_id(&self) -> &'static str {
self.to_type().into()
}
/// Provides a `&str` that uniquely identifies each enum variant.
fn to_type(&self) -> WorkType {
match self {
Work::GossipAttestation { .. } => GOSSIP_ATTESTATION,
Work::GossipAttestationBatch { .. } => GOSSIP_ATTESTATION_BATCH,
Work::GossipAggregate { .. } => GOSSIP_AGGREGATE,
Work::GossipAggregateBatch { .. } => GOSSIP_AGGREGATE_BATCH,
Work::GossipBlock(_) => GOSSIP_BLOCK,
Work::GossipBlobSidecar(_) => GOSSIP_BLOBS_SIDECAR,
Work::GossipDataColumnSidecar(_) => GOSSIP_BLOBS_COLUMN_SIDECAR,
Work::DelayedImportBlock { .. } => DELAYED_IMPORT_BLOCK,
Work::GossipVoluntaryExit(_) => GOSSIP_VOLUNTARY_EXIT,
Work::GossipProposerSlashing(_) => GOSSIP_PROPOSER_SLASHING,
Work::GossipAttesterSlashing(_) => GOSSIP_ATTESTER_SLASHING,
Work::GossipSyncSignature(_) => GOSSIP_SYNC_SIGNATURE,
Work::GossipSyncContribution(_) => GOSSIP_SYNC_CONTRIBUTION,
Work::GossipLightClientFinalityUpdate(_) => GOSSIP_LIGHT_CLIENT_FINALITY_UPDATE,
Work::GossipLightClientOptimisticUpdate(_) => GOSSIP_LIGHT_CLIENT_OPTIMISTIC_UPDATE,
Work::RpcBlock { .. } => RPC_BLOCK,
Work::RpcBlobs { .. } => RPC_BLOBS,
Work::RpcCustodyColumn { .. } => RPC_CUSTODY_COLUMN,
Work::RpcVerifyDataColumn(_) => RPC_VERIFY_DATA_COLUMNS,
Work::SamplingResult(_) => SAMPLING_RESULT,
Work::IgnoredRpcBlock { .. } => IGNORED_RPC_BLOCK,
Work::ChainSegment { .. } => CHAIN_SEGMENT,
Work::ChainSegmentBackfill(_) => CHAIN_SEGMENT_BACKFILL,
Work::Status(_) => STATUS_PROCESSING,
Work::BlocksByRangeRequest(_) => BLOCKS_BY_RANGE_REQUEST,
Work::BlocksByRootsRequest(_) => BLOCKS_BY_ROOTS_REQUEST,
Work::BlobsByRangeRequest(_) => BLOBS_BY_RANGE_REQUEST,
Work::BlobsByRootsRequest(_) => BLOBS_BY_ROOTS_REQUEST,
Work::DataColumnsByRootsRequest(_) => DATA_COLUMNS_BY_ROOTS_REQUEST,
Work::DataColumnsByRangeRequest(_) => DATA_COLUMNS_BY_RANGE_REQUEST,
Work::LightClientBootstrapRequest(_) => LIGHT_CLIENT_BOOTSTRAP_REQUEST,
Work::LightClientOptimisticUpdateRequest(_) => LIGHT_CLIENT_OPTIMISTIC_UPDATE_REQUEST,
Work::LightClientFinalityUpdateRequest(_) => LIGHT_CLIENT_FINALITY_UPDATE_REQUEST,
Work::UnknownBlockAttestation { .. } => UNKNOWN_BLOCK_ATTESTATION,
Work::UnknownBlockAggregate { .. } => UNKNOWN_BLOCK_AGGREGATE,
Work::UnknownLightClientOptimisticUpdate { .. } => UNKNOWN_LIGHT_CLIENT_UPDATE,
Work::UnknownBlockSamplingRequest { .. } => UNKNOWN_BLOCK_SAMPLING_REQUEST,
Work::GossipBlsToExecutionChange(_) => GOSSIP_BLS_TO_EXECUTION_CHANGE,
Work::ApiRequestP0 { .. } => API_REQUEST_P0,
Work::ApiRequestP1 { .. } => API_REQUEST_P1,
Work::GossipAttestation { .. } => WorkType::GossipAttestation,
Work::GossipAttestationBatch { .. } => WorkType::GossipAttestationBatch,
Work::GossipAggregate { .. } => WorkType::GossipAggregate,
Work::GossipAggregateBatch { .. } => WorkType::GossipAggregateBatch,
Work::GossipBlock(_) => WorkType::GossipBlock,
Work::GossipBlobSidecar(_) => WorkType::GossipBlobSidecar,
Work::GossipDataColumnSidecar(_) => WorkType::GossipDataColumnSidecar,
Work::DelayedImportBlock { .. } => WorkType::DelayedImportBlock,
Work::GossipVoluntaryExit(_) => WorkType::GossipVoluntaryExit,
Work::GossipProposerSlashing(_) => WorkType::GossipProposerSlashing,
Work::GossipAttesterSlashing(_) => WorkType::GossipAttesterSlashing,
Work::GossipSyncSignature(_) => WorkType::GossipSyncSignature,
Work::GossipSyncContribution(_) => WorkType::GossipSyncContribution,
Work::GossipLightClientFinalityUpdate(_) => WorkType::GossipLightClientFinalityUpdate,
Work::GossipLightClientOptimisticUpdate(_) => {
WorkType::GossipLightClientOptimisticUpdate
}
Work::GossipBlsToExecutionChange(_) => WorkType::GossipBlsToExecutionChange,
Work::RpcBlock { .. } => WorkType::RpcBlock,
Work::RpcBlobs { .. } => WorkType::RpcBlobs,
Work::RpcCustodyColumn { .. } => WorkType::RpcCustodyColumn,
Work::RpcVerifyDataColumn { .. } => WorkType::RpcVerifyDataColumn,
Work::SamplingResult { .. } => WorkType::SamplingResult,
Work::IgnoredRpcBlock { .. } => WorkType::IgnoredRpcBlock,
Work::ChainSegment { .. } => WorkType::ChainSegment,
Work::ChainSegmentBackfill(_) => WorkType::ChainSegmentBackfill,
Work::Status(_) => WorkType::Status,
Work::BlocksByRangeRequest(_) => WorkType::BlocksByRangeRequest,
Work::BlocksByRootsRequest(_) => WorkType::BlocksByRootsRequest,
Work::BlobsByRangeRequest(_) => WorkType::BlobsByRangeRequest,
Work::BlobsByRootsRequest(_) => WorkType::BlobsByRootsRequest,
Work::DataColumnsByRootsRequest(_) => WorkType::DataColumnsByRootsRequest,
Work::DataColumnsByRangeRequest(_) => WorkType::DataColumnsByRangeRequest,
Work::LightClientBootstrapRequest(_) => WorkType::LightClientBootstrapRequest,
Work::LightClientOptimisticUpdateRequest(_) => {
WorkType::LightClientOptimisticUpdateRequest
}
Work::LightClientFinalityUpdateRequest(_) => WorkType::LightClientFinalityUpdateRequest,
Work::UnknownBlockAttestation { .. } => WorkType::UnknownBlockAttestation,
Work::UnknownBlockAggregate { .. } => WorkType::UnknownBlockAggregate,
Work::UnknownBlockSamplingRequest { .. } => WorkType::UnknownBlockSamplingRequest,
Work::UnknownLightClientOptimisticUpdate { .. } => {
WorkType::UnknownLightClientOptimisticUpdate
}
Work::ApiRequestP0 { .. } => WorkType::ApiRequestP0,
Work::ApiRequestP1 { .. } => WorkType::ApiRequestP1,
}
}
}
@@ -987,7 +1008,7 @@ impl<E: EthSpec> BeaconProcessor<E> {
.map_or(false, |event| event.drop_during_sync);
let idle_tx = idle_tx.clone();
match work_event {
let modified_queue_id = match work_event {
// There is no new work event, but we are able to spawn a new worker.
//
// We don't check the `work.drop_during_sync` here. We assume that if it made
@@ -995,38 +1016,40 @@ impl<E: EthSpec> BeaconProcessor<E> {
None if can_spawn => {
// Check for chain segments first, they're the most efficient way to get
// blocks into the system.
if let Some(item) = chain_segment_queue.pop() {
self.spawn_worker(item, idle_tx);
let work_event: Option<Work<E>> = if let Some(item) =
chain_segment_queue.pop()
{
Some(item)
// Check sync blocks before gossip blocks, since we've already explicitly
// requested these blocks.
} else if let Some(item) = rpc_block_queue.pop() {
self.spawn_worker(item, idle_tx);
Some(item)
} else if let Some(item) = rpc_blob_queue.pop() {
self.spawn_worker(item, idle_tx);
Some(item)
} else if let Some(item) = rpc_custody_column_queue.pop() {
self.spawn_worker(item, idle_tx);
Some(item)
// TODO(das): decide proper prioritization for sampling columns
} else if let Some(item) = rpc_custody_column_queue.pop() {
self.spawn_worker(item, idle_tx);
Some(item)
} else if let Some(item) = rpc_verify_data_column_queue.pop() {
self.spawn_worker(item, idle_tx);
Some(item)
} else if let Some(item) = sampling_result_queue.pop() {
self.spawn_worker(item, idle_tx);
Some(item)
// Check delayed blocks before gossip blocks, the gossip blocks might rely
// on the delayed ones.
} else if let Some(item) = delayed_block_queue.pop() {
self.spawn_worker(item, idle_tx);
Some(item)
// Check gossip blocks before gossip attestations, since a block might be
// required to verify some attestations.
} else if let Some(item) = gossip_block_queue.pop() {
self.spawn_worker(item, idle_tx);
Some(item)
} else if let Some(item) = gossip_blob_queue.pop() {
self.spawn_worker(item, idle_tx);
Some(item)
} else if let Some(item) = gossip_data_column_queue.pop() {
self.spawn_worker(item, idle_tx);
Some(item)
// Check the priority 0 API requests after blocks and blobs, but before attestations.
} else if let Some(item) = api_request_p0_queue.pop() {
self.spawn_worker(item, idle_tx);
Some(item)
// Check the aggregates, *then* the unaggregates since we assume that
// aggregates are more valuable to local validators and effectively give us
// more information with less signature verification time.
@@ -1038,9 +1061,7 @@ impl<E: EthSpec> BeaconProcessor<E> {
if batch_size < 2 {
// One single aggregate is in the queue, process it individually.
if let Some(item) = aggregate_queue.pop() {
self.spawn_worker(item, idle_tx);
}
aggregate_queue.pop()
} else {
// Collect two or more aggregates into a batch, so they can take
// advantage of batch signature verification.
@@ -1071,13 +1092,10 @@ impl<E: EthSpec> BeaconProcessor<E> {
if let Some(process_batch) = process_batch_opt {
// Process all aggregates with a single worker.
self.spawn_worker(
Work::GossipAggregateBatch {
aggregates,
process_batch,
},
idle_tx,
)
Some(Work::GossipAggregateBatch {
aggregates,
process_batch,
})
} else {
// There is no good reason for this to
// happen, it is a serious logic error.
@@ -1085,6 +1103,7 @@ impl<E: EthSpec> BeaconProcessor<E> {
// work items exist, we should always have a
// work closure at this point.
crit!(self.log, "Missing aggregate work");
None
}
}
// Check the unaggregated attestation queue.
@@ -1098,9 +1117,7 @@ impl<E: EthSpec> BeaconProcessor<E> {
if batch_size < 2 {
// One single attestation is in the queue, process it individually.
if let Some(item) = attestation_queue.pop() {
self.spawn_worker(item, idle_tx);
}
attestation_queue.pop()
} else {
// Collect two or more attestations into a batch, so they can take
// advantage of batch signature verification.
@@ -1132,13 +1149,10 @@ impl<E: EthSpec> BeaconProcessor<E> {
if let Some(process_batch) = process_batch_opt {
// Process all attestations with a single worker.
self.spawn_worker(
Work::GossipAttestationBatch {
attestations,
process_batch,
},
idle_tx,
)
Some(Work::GossipAttestationBatch {
attestations,
process_batch,
})
} else {
// There is no good reason for this to
// happen, it is a serious logic error.
@@ -1146,71 +1160,72 @@ impl<E: EthSpec> BeaconProcessor<E> {
// work items exist, we should always have a
// work closure at this point.
crit!(self.log, "Missing attestations work");
None
}
}
// Check sync committee messages after attestations as their rewards are lesser
// and they don't influence fork choice.
} else if let Some(item) = sync_contribution_queue.pop() {
self.spawn_worker(item, idle_tx);
Some(item)
} else if let Some(item) = sync_message_queue.pop() {
self.spawn_worker(item, idle_tx);
Some(item)
// Aggregates and unaggregates queued for re-processing are older and we
// care about fresher ones, so check those first.
} else if let Some(item) = unknown_block_aggregate_queue.pop() {
self.spawn_worker(item, idle_tx);
Some(item)
} else if let Some(item) = unknown_block_attestation_queue.pop() {
self.spawn_worker(item, idle_tx);
Some(item)
// Check RPC methods next. Status messages are needed for sync so
// prioritize them over syncing requests from other peers (BlocksByRange
// and BlocksByRoot)
} else if let Some(item) = status_queue.pop() {
self.spawn_worker(item, idle_tx);
Some(item)
} else if let Some(item) = bbrange_queue.pop() {
self.spawn_worker(item, idle_tx);
Some(item)
} else if let Some(item) = bbroots_queue.pop() {
self.spawn_worker(item, idle_tx);
Some(item)
} else if let Some(item) = blbrange_queue.pop() {
self.spawn_worker(item, idle_tx);
Some(item)
} else if let Some(item) = blbroots_queue.pop() {
self.spawn_worker(item, idle_tx);
Some(item)
} else if let Some(item) = dcbroots_queue.pop() {
self.spawn_worker(item, idle_tx);
Some(item)
} else if let Some(item) = dcbrange_queue.pop() {
self.spawn_worker(item, idle_tx);
Some(item)
// Prioritize sampling requests after block syncing requests
} else if let Some(item) = unknown_block_sampling_request_queue.pop() {
self.spawn_worker(item, idle_tx);
Some(item)
// Check slashings after all other consensus messages so we prioritize
// following head.
//
// Check attester slashings before proposer slashings since they have the
// potential to slash multiple validators at once.
} else if let Some(item) = gossip_attester_slashing_queue.pop() {
self.spawn_worker(item, idle_tx);
Some(item)
} else if let Some(item) = gossip_proposer_slashing_queue.pop() {
self.spawn_worker(item, idle_tx);
Some(item)
// Check exits and address changes late since our validators don't get
// rewards from them.
} else if let Some(item) = gossip_voluntary_exit_queue.pop() {
self.spawn_worker(item, idle_tx);
Some(item)
} else if let Some(item) = gossip_bls_to_execution_change_queue.pop() {
self.spawn_worker(item, idle_tx);
Some(item)
// Check the priority 1 API requests after we've
// processed all the interesting things from the network
// and things required for us to stay in good repute
// with our P2P peers.
} else if let Some(item) = api_request_p1_queue.pop() {
self.spawn_worker(item, idle_tx);
Some(item)
// Handle backfill sync chain segments.
} else if let Some(item) = backfill_chain_segment.pop() {
self.spawn_worker(item, idle_tx);
Some(item)
// Handle light client requests.
} else if let Some(item) = lc_bootstrap_queue.pop() {
self.spawn_worker(item, idle_tx);
Some(item)
} else if let Some(item) = lc_optimistic_update_queue.pop() {
self.spawn_worker(item, idle_tx);
Some(item)
} else if let Some(item) = lc_finality_update_queue.pop() {
self.spawn_worker(item, idle_tx);
Some(item)
// This statement should always be the final else statement.
} else {
// Let the journal know that a worker is freed and there's nothing else
@@ -1220,6 +1235,15 @@ impl<E: EthSpec> BeaconProcessor<E> {
// during testing.
let _ = work_journal_tx.try_send(NOTHING_TO_DO);
}
None
};
if let Some(work_event) = work_event {
let work_type = work_event.to_type();
self.spawn_worker(work_event, idle_tx);
Some(work_type)
} else {
None
}
}
// There is no new work event and we are unable to spawn a new worker.
@@ -1231,6 +1255,7 @@ impl<E: EthSpec> BeaconProcessor<E> {
"Unexpected gossip processor condition";
"msg" => "no new work and cannot spawn worker"
);
None
}
// The chain is syncing and this event should be dropped during sync.
Some(work_event)
@@ -1248,11 +1273,13 @@ impl<E: EthSpec> BeaconProcessor<E> {
"msg" => "chain is syncing",
"work_id" => work_id
);
None
}
// There is a new work event and the chain is not syncing. Process it or queue
// it.
Some(WorkEvent { work, .. }) => {
let work_id = work.str_id();
let work_type = work.to_type();
match work {
_ if can_spawn => self.spawn_worker(work, idle_tx),
@@ -1371,94 +1398,76 @@ impl<E: EthSpec> BeaconProcessor<E> {
Work::ApiRequestP1 { .. } => {
api_request_p1_queue.push(work, work_id, &self.log)
}
}
};
Some(work_type)
}
}
};
metrics::set_gauge(
&metrics::BEACON_PROCESSOR_WORKERS_ACTIVE_TOTAL,
self.current_workers as i64,
);
metrics::set_gauge(
&metrics::BEACON_PROCESSOR_UNAGGREGATED_ATTESTATION_QUEUE_TOTAL,
attestation_queue.len() as i64,
);
metrics::set_gauge(
&metrics::BEACON_PROCESSOR_AGGREGATED_ATTESTATION_QUEUE_TOTAL,
aggregate_queue.len() as i64,
);
metrics::set_gauge(
&metrics::BEACON_PROCESSOR_SYNC_MESSAGE_QUEUE_TOTAL,
sync_message_queue.len() as i64,
);
metrics::set_gauge(
&metrics::BEACON_PROCESSOR_SYNC_CONTRIBUTION_QUEUE_TOTAL,
sync_contribution_queue.len() as i64,
);
metrics::set_gauge(
&metrics::BEACON_PROCESSOR_GOSSIP_BLOCK_QUEUE_TOTAL,
gossip_block_queue.len() as i64,
);
metrics::set_gauge(
&metrics::BEACON_PROCESSOR_GOSSIP_BLOB_QUEUE_TOTAL,
gossip_blob_queue.len() as i64,
);
metrics::set_gauge(
&metrics::BEACON_PROCESSOR_GOSSIP_DATA_COLUMN_QUEUE_TOTAL,
gossip_data_column_queue.len() as i64,
);
metrics::set_gauge(
&metrics::BEACON_PROCESSOR_RPC_BLOCK_QUEUE_TOTAL,
rpc_block_queue.len() as i64,
);
metrics::set_gauge(
&metrics::BEACON_PROCESSOR_RPC_BLOB_QUEUE_TOTAL,
rpc_blob_queue.len() as i64,
);
metrics::set_gauge(
&metrics::BEACON_PROCESSOR_RPC_CUSTODY_COLUMN_QUEUE_TOTAL,
rpc_custody_column_queue.len() as i64,
);
metrics::set_gauge(
&metrics::BEACON_PROCESSOR_RPC_VERIFY_DATA_COLUMN_QUEUE_TOTAL,
rpc_verify_data_column_queue.len() as i64,
);
metrics::set_gauge(
&metrics::BEACON_PROCESSOR_SAMPLING_RESULT_QUEUE_TOTAL,
sampling_result_queue.len() as i64,
);
metrics::set_gauge(
&metrics::BEACON_PROCESSOR_CHAIN_SEGMENT_QUEUE_TOTAL,
chain_segment_queue.len() as i64,
);
metrics::set_gauge(
&metrics::BEACON_PROCESSOR_BACKFILL_CHAIN_SEGMENT_QUEUE_TOTAL,
backfill_chain_segment.len() as i64,
);
metrics::set_gauge(
&metrics::BEACON_PROCESSOR_EXIT_QUEUE_TOTAL,
gossip_voluntary_exit_queue.len() as i64,
);
metrics::set_gauge(
&metrics::BEACON_PROCESSOR_PROPOSER_SLASHING_QUEUE_TOTAL,
gossip_proposer_slashing_queue.len() as i64,
);
metrics::set_gauge(
&metrics::BEACON_PROCESSOR_ATTESTER_SLASHING_QUEUE_TOTAL,
gossip_attester_slashing_queue.len() as i64,
);
metrics::set_gauge(
&metrics::BEACON_PROCESSOR_BLS_TO_EXECUTION_CHANGE_QUEUE_TOTAL,
gossip_bls_to_execution_change_queue.len() as i64,
);
metrics::set_gauge(
&metrics::BEACON_PROCESSOR_API_REQUEST_P0_QUEUE_TOTAL,
api_request_p0_queue.len() as i64,
);
metrics::set_gauge(
&metrics::BEACON_PROCESSOR_API_REQUEST_P1_QUEUE_TOTAL,
api_request_p1_queue.len() as i64,
);
if let Some(modified_queue_id) = modified_queue_id {
let queue_len = match modified_queue_id {
WorkType::GossipAttestation => aggregate_queue.len(),
WorkType::UnknownBlockAttestation => unknown_block_attestation_queue.len(),
WorkType::GossipAttestationBatch => 0, // No queue
WorkType::GossipAggregate => aggregate_queue.len(),
WorkType::UnknownBlockAggregate => unknown_block_aggregate_queue.len(),
WorkType::UnknownLightClientOptimisticUpdate => {
unknown_light_client_update_queue.len()
}
WorkType::UnknownBlockSamplingRequest => {
unknown_block_sampling_request_queue.len()
}
WorkType::GossipAggregateBatch => 0, // No queue
WorkType::GossipBlock => gossip_block_queue.len(),
WorkType::GossipBlobSidecar => gossip_blob_queue.len(),
WorkType::GossipDataColumnSidecar => gossip_data_column_queue.len(),
WorkType::DelayedImportBlock => delayed_block_queue.len(),
WorkType::GossipVoluntaryExit => gossip_voluntary_exit_queue.len(),
WorkType::GossipProposerSlashing => gossip_proposer_slashing_queue.len(),
WorkType::GossipAttesterSlashing => gossip_attester_slashing_queue.len(),
WorkType::GossipSyncSignature => sync_message_queue.len(),
WorkType::GossipSyncContribution => sync_contribution_queue.len(),
WorkType::GossipLightClientFinalityUpdate => finality_update_queue.len(),
WorkType::GossipLightClientOptimisticUpdate => {
optimistic_update_queue.len()
}
WorkType::RpcBlock => rpc_block_queue.len(),
WorkType::RpcBlobs | WorkType::IgnoredRpcBlock => rpc_blob_queue.len(),
WorkType::RpcCustodyColumn => rpc_custody_column_queue.len(),
WorkType::RpcVerifyDataColumn => rpc_verify_data_column_queue.len(),
WorkType::SamplingResult => sampling_result_queue.len(),
WorkType::ChainSegment => chain_segment_queue.len(),
WorkType::ChainSegmentBackfill => backfill_chain_segment.len(),
WorkType::Status => status_queue.len(),
WorkType::BlocksByRangeRequest => blbrange_queue.len(),
WorkType::BlocksByRootsRequest => blbroots_queue.len(),
WorkType::BlobsByRangeRequest => bbrange_queue.len(),
WorkType::BlobsByRootsRequest => bbroots_queue.len(),
WorkType::DataColumnsByRootsRequest => dcbroots_queue.len(),
WorkType::DataColumnsByRangeRequest => dcbrange_queue.len(),
WorkType::GossipBlsToExecutionChange => {
gossip_bls_to_execution_change_queue.len()
}
WorkType::LightClientBootstrapRequest => lc_bootstrap_queue.len(),
WorkType::LightClientOptimisticUpdateRequest => {
lc_optimistic_update_queue.len()
}
WorkType::LightClientFinalityUpdateRequest => {
lc_finality_update_queue.len()
}
WorkType::ApiRequestP0 => api_request_p0_queue.len(),
WorkType::ApiRequestP1 => api_request_p1_queue.len(),
};
metrics::observe_vec(
&metrics::BEACON_PROCESSOR_QUEUE_LENGTH,
&[modified_queue_id.into()],
queue_len as f64,
);
}
if aggregate_queue.is_full() && aggregate_debounce.elapsed() {
error!(

View File

@@ -62,163 +62,16 @@ pub static BEACON_PROCESSOR_EVENT_HANDLING_SECONDS: LazyLock<Result<Histogram>>
"Time spent handling a new message and allocating it to a queue or worker.",
)
});
// Gossip blocks.
pub static BEACON_PROCESSOR_GOSSIP_BLOCK_QUEUE_TOTAL: LazyLock<Result<IntGauge>> =
LazyLock::new(|| {
try_create_int_gauge(
"beacon_processor_gossip_block_queue_total",
"Count of blocks from gossip waiting to be verified.",
)
});
// Gossip blobs.
pub static BEACON_PROCESSOR_GOSSIP_BLOB_QUEUE_TOTAL: LazyLock<Result<IntGauge>> =
LazyLock::new(|| {
try_create_int_gauge(
"beacon_processor_gossip_blob_queue_total",
"Count of blobs from gossip waiting to be verified.",
)
});
// Gossip data column sidecars.
pub static BEACON_PROCESSOR_GOSSIP_DATA_COLUMN_QUEUE_TOTAL: LazyLock<Result<IntGauge>> =
LazyLock::new(|| {
try_create_int_gauge(
"beacon_processor_gossip_data_column_queue_total",
"Count of data column sidecars from gossip waiting to be verified.",
)
});
// Gossip Exits.
pub static BEACON_PROCESSOR_EXIT_QUEUE_TOTAL: LazyLock<Result<IntGauge>> = LazyLock::new(|| {
try_create_int_gauge(
"beacon_processor_exit_queue_total",
"Count of exits from gossip waiting to be verified.",
pub static BEACON_PROCESSOR_QUEUE_LENGTH: LazyLock<Result<HistogramVec>> = LazyLock::new(|| {
try_create_histogram_vec_with_buckets(
"beacon_processor_work_event_queue_length",
"Count of work events in queue waiting to be processed.",
Ok(vec![
0.0, 1.0, 4.0, 16.0, 64.0, 256.0, 1024.0, 4096.0, 16384.0, 65536.0,
]),
&["type"],
)
});
// Gossip proposer slashings.
pub static BEACON_PROCESSOR_PROPOSER_SLASHING_QUEUE_TOTAL: LazyLock<Result<IntGauge>> =
LazyLock::new(|| {
try_create_int_gauge(
"beacon_processor_proposer_slashing_queue_total",
"Count of proposer slashings from gossip waiting to be verified.",
)
});
// Gossip attester slashings.
pub static BEACON_PROCESSOR_ATTESTER_SLASHING_QUEUE_TOTAL: LazyLock<Result<IntGauge>> =
LazyLock::new(|| {
try_create_int_gauge(
"beacon_processor_attester_slashing_queue_total",
"Count of attester slashings from gossip waiting to be verified.",
)
});
// Gossip BLS to execution changes.
pub static BEACON_PROCESSOR_BLS_TO_EXECUTION_CHANGE_QUEUE_TOTAL: LazyLock<Result<IntGauge>> =
LazyLock::new(|| {
try_create_int_gauge(
"beacon_processor_bls_to_execution_change_queue_total",
"Count of address changes from gossip waiting to be verified.",
)
});
// Rpc blocks.
pub static BEACON_PROCESSOR_RPC_BLOCK_QUEUE_TOTAL: LazyLock<Result<IntGauge>> =
LazyLock::new(|| {
try_create_int_gauge(
"beacon_processor_rpc_block_queue_total",
"Count of blocks from the rpc waiting to be verified.",
)
});
// Rpc blobs.
pub static BEACON_PROCESSOR_RPC_BLOB_QUEUE_TOTAL: LazyLock<Result<IntGauge>> =
LazyLock::new(|| {
try_create_int_gauge(
"beacon_processor_rpc_blob_queue_total",
"Count of blobs from the rpc waiting to be verified.",
)
});
// Rpc custody data columns.
pub static BEACON_PROCESSOR_RPC_CUSTODY_COLUMN_QUEUE_TOTAL: LazyLock<Result<IntGauge>> =
LazyLock::new(|| {
try_create_int_gauge(
"beacon_processor_rpc_custody_column_queue_total",
"Count of custody columns from the rpc waiting to be imported.",
)
});
// Rpc verify data columns
pub static BEACON_PROCESSOR_RPC_VERIFY_DATA_COLUMN_QUEUE_TOTAL: LazyLock<Result<IntGauge>> =
LazyLock::new(|| {
try_create_int_gauge(
"beacon_processor_rpc_verify_data_column_queue_total",
"Count of data columns from the rpc waiting to be verified.",
)
});
// Sampling result
pub static BEACON_PROCESSOR_SAMPLING_RESULT_QUEUE_TOTAL: LazyLock<Result<IntGauge>> =
LazyLock::new(|| {
try_create_int_gauge(
"beacon_processor_sampling_result_queue_total",
"Count of sampling results waiting to be processed.",
)
});
// Chain segments.
pub static BEACON_PROCESSOR_CHAIN_SEGMENT_QUEUE_TOTAL: LazyLock<Result<IntGauge>> =
LazyLock::new(|| {
try_create_int_gauge(
"beacon_processor_chain_segment_queue_total",
"Count of chain segments from the rpc waiting to be verified.",
)
});
pub static BEACON_PROCESSOR_BACKFILL_CHAIN_SEGMENT_QUEUE_TOTAL: LazyLock<Result<IntGauge>> =
LazyLock::new(|| {
try_create_int_gauge(
"beacon_processor_backfill_chain_segment_queue_total",
"Count of backfill chain segments from the rpc waiting to be verified.",
)
});
// Unaggregated attestations.
pub static BEACON_PROCESSOR_UNAGGREGATED_ATTESTATION_QUEUE_TOTAL: LazyLock<Result<IntGauge>> =
LazyLock::new(|| {
try_create_int_gauge(
"beacon_processor_unaggregated_attestation_queue_total",
"Count of unagg. attestations waiting to be processed.",
)
});
// Aggregated attestations.
pub static BEACON_PROCESSOR_AGGREGATED_ATTESTATION_QUEUE_TOTAL: LazyLock<Result<IntGauge>> =
LazyLock::new(|| {
try_create_int_gauge(
"beacon_processor_aggregated_attestation_queue_total",
"Count of agg. attestations waiting to be processed.",
)
});
// Sync committee messages.
pub static BEACON_PROCESSOR_SYNC_MESSAGE_QUEUE_TOTAL: LazyLock<Result<IntGauge>> =
LazyLock::new(|| {
try_create_int_gauge(
"beacon_processor_sync_message_queue_total",
"Count of sync committee messages waiting to be processed.",
)
});
// Sync contribution.
pub static BEACON_PROCESSOR_SYNC_CONTRIBUTION_QUEUE_TOTAL: LazyLock<Result<IntGauge>> =
LazyLock::new(|| {
try_create_int_gauge(
"beacon_processor_sync_contribution_queue_total",
"Count of sync committee contributions waiting to be processed.",
)
});
// HTTP API requests.
pub static BEACON_PROCESSOR_API_REQUEST_P0_QUEUE_TOTAL: LazyLock<Result<IntGauge>> =
LazyLock::new(|| {
try_create_int_gauge(
"beacon_processor_api_request_p0_queue_total",
"Count of P0 HTTP requesets waiting to be processed.",
)
});
pub static BEACON_PROCESSOR_API_REQUEST_P1_QUEUE_TOTAL: LazyLock<Result<IntGauge>> =
LazyLock::new(|| {
try_create_int_gauge(
"beacon_processor_api_request_p1_queue_total",
"Count of P1 HTTP requesets waiting to be processed.",
)
});
/*
* Attestation reprocessing queue metrics.